[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13680


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r80605670
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,134 +37,213 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 8 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+int fixedPartInBytes =
+  ByteArrayMethods.roundNumberOfBytesToNearestWord(elementSize * 
numElements);
+holder.grow(headerInBytes + fixedPartInBytes);
+
+// Write numElements and clear out null bits to header
+Platform.putLong(holder.buffer, startingOffset, numElements);
+for (int i = 8; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+
+// fill 0 into reminder part of 8-bytes alignment in unsafe array
+for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
+  Platform.putByte(holder.buffer, startingOffset + headerInBytes + i, 
(byte) 0);
+}
+holder.cursor += (headerInBytes + fixedPartInBytes);
+  }
+
+  private void zeroOutPaddingBytes(int numBytes) {
+if ((numBytes & 0x07) > 0) {
+  Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 
3), 0L);
+}
+  }
+
+  private long getElementOffset(int ordinal, int elementSize) {
+return startingOffset + headerInBytes + ordinal * elementSize;
+  }
+
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
+assertIndexIsValid(ordinal);
+final long relativeOffset = currentCursor - startingOffset;
+final long offsetAndSize = (relativeOffset << 32) | size;
 
-// Grows the global buffer ahead for fixed size data.
-holder.grow(fixedElementSize * numElements);
+write(ordinal, offsetAndSize);
   }
 
-  private long getElementOffset(int ordinal) {
-return startingOffset + 4 + 4 * ordinal;
+  private void setNullBit(int ordinal) {
+assertIndexIsValid(ordinal);
+BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
   }
 
-  public void setNullAt(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-// Writes negative offset value to represent null element.
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
-relativeOffset);
+  public void setNullBoolean(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), 
false);
   }
 
-  public void setOffset(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
relativeOffset);
+  public void setNullByte(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
   }
 
+  public void setNullShort(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), 
(short)0);
+  }
+
+  public void setNullInt(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0);
+  }
+
+  public void setNullLong(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putLong(holder.buffer, 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r80605631
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,134 +37,213 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 8 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+int fixedPartInBytes =
+  ByteArrayMethods.roundNumberOfBytesToNearestWord(elementSize * 
numElements);
+holder.grow(headerInBytes + fixedPartInBytes);
+
+// Write numElements and clear out null bits to header
+Platform.putLong(holder.buffer, startingOffset, numElements);
+for (int i = 8; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+
+// fill 0 into reminder part of 8-bytes alignment in unsafe array
+for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
+  Platform.putByte(holder.buffer, startingOffset + headerInBytes + i, 
(byte) 0);
+}
+holder.cursor += (headerInBytes + fixedPartInBytes);
+  }
+
+  private void zeroOutPaddingBytes(int numBytes) {
+if ((numBytes & 0x07) > 0) {
+  Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 
3), 0L);
+}
+  }
+
+  private long getElementOffset(int ordinal, int elementSize) {
+return startingOffset + headerInBytes + ordinal * elementSize;
+  }
+
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
+assertIndexIsValid(ordinal);
+final long relativeOffset = currentCursor - startingOffset;
+final long offsetAndSize = (relativeOffset << 32) | size;
 
-// Grows the global buffer ahead for fixed size data.
-holder.grow(fixedElementSize * numElements);
+write(ordinal, offsetAndSize);
   }
 
-  private long getElementOffset(int ordinal) {
-return startingOffset + 4 + 4 * ordinal;
+  private void setNullBit(int ordinal) {
+assertIndexIsValid(ordinal);
+BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
   }
 
-  public void setNullAt(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-// Writes negative offset value to represent null element.
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
-relativeOffset);
+  public void setNullBoolean(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), 
false);
   }
 
-  public void setOffset(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
relativeOffset);
+  public void setNullByte(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
   }
 
+  public void setNullShort(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), 
(short)0);
+  }
+
+  public void setNullInt(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0);
+  }
+
+  public void setNullLong(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putLong(holder.buffer, 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79806173
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,134 +37,213 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 8 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+int fixedPartInBytes =
+  ByteArrayMethods.roundNumberOfBytesToNearestWord(elementSize * 
numElements);
+holder.grow(headerInBytes + fixedPartInBytes);
+
+// Write numElements and clear out null bits to header
+Platform.putLong(holder.buffer, startingOffset, numElements);
+for (int i = 8; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+
+// fill 0 into reminder part of 8-bytes alignment in unsafe array
+for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
+  Platform.putByte(holder.buffer, startingOffset + headerInBytes + i, 
(byte) 0);
+}
+holder.cursor += (headerInBytes + fixedPartInBytes);
+  }
+
+  private void zeroOutPaddingBytes(int numBytes) {
+if ((numBytes & 0x07) > 0) {
+  Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 
3), 0L);
+}
+  }
+
+  private long getElementOffset(int ordinal, int elementSize) {
+return startingOffset + headerInBytes + ordinal * elementSize;
+  }
+
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
+assertIndexIsValid(ordinal);
+final long relativeOffset = currentCursor - startingOffset;
+final long offsetAndSize = (relativeOffset << 32) | size;
 
-// Grows the global buffer ahead for fixed size data.
-holder.grow(fixedElementSize * numElements);
+write(ordinal, offsetAndSize);
   }
 
-  private long getElementOffset(int ordinal) {
-return startingOffset + 4 + 4 * ordinal;
+  private void setNullBit(int ordinal) {
+assertIndexIsValid(ordinal);
+BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
   }
 
-  public void setNullAt(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-// Writes negative offset value to represent null element.
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
-relativeOffset);
+  public void setNullBoolean(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), 
false);
   }
 
-  public void setOffset(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
relativeOffset);
+  public void setNullByte(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
   }
 
+  public void setNullShort(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), 
(short)0);
+  }
+
+  public void setNullInt(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0);
+  }
+
+  public void setNullLong(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79806095
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,134 +37,213 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 8 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+int fixedPartInBytes =
+  ByteArrayMethods.roundNumberOfBytesToNearestWord(elementSize * 
numElements);
+holder.grow(headerInBytes + fixedPartInBytes);
+
+// Write numElements and clear out null bits to header
+Platform.putLong(holder.buffer, startingOffset, numElements);
+for (int i = 8; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+
+// fill 0 into reminder part of 8-bytes alignment in unsafe array
+for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
+  Platform.putByte(holder.buffer, startingOffset + headerInBytes + i, 
(byte) 0);
+}
+holder.cursor += (headerInBytes + fixedPartInBytes);
+  }
+
+  private void zeroOutPaddingBytes(int numBytes) {
+if ((numBytes & 0x07) > 0) {
+  Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 
3), 0L);
+}
+  }
+
+  private long getElementOffset(int ordinal, int elementSize) {
+return startingOffset + headerInBytes + ordinal * elementSize;
+  }
+
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
+assertIndexIsValid(ordinal);
+final long relativeOffset = currentCursor - startingOffset;
+final long offsetAndSize = (relativeOffset << 32) | size;
 
-// Grows the global buffer ahead for fixed size data.
-holder.grow(fixedElementSize * numElements);
+write(ordinal, offsetAndSize);
   }
 
-  private long getElementOffset(int ordinal) {
-return startingOffset + 4 + 4 * ordinal;
+  private void setNullBit(int ordinal) {
+assertIndexIsValid(ordinal);
+BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
   }
 
-  public void setNullAt(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-// Writes negative offset value to represent null element.
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
-relativeOffset);
+  public void setNullBoolean(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), 
false);
   }
 
-  public void setOffset(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
relativeOffset);
+  public void setNullByte(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
   }
 
+  public void setNullShort(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), 
(short)0);
+  }
+
+  public void setNullInt(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0);
+  }
+
+  public void setNullLong(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79654723
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -690,6 +690,7 @@ object ScalaReflection extends ScalaReflection {
 Schema(DecimalType.BigIntDecimal, nullable = true)
   case t if t <:< localTypeOf[scala.math.BigInt] =>
 Schema(DecimalType.BigIntDecimal, nullable = true)
+  case t if t <:< localTypeOf[CalendarInterval] => 
Schema(CalendarIntervalType, nullable = true)
--- End diff --

is it still needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79654687
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,134 +37,213 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 8 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+int fixedPartInBytes =
+  ByteArrayMethods.roundNumberOfBytesToNearestWord(elementSize * 
numElements);
+holder.grow(headerInBytes + fixedPartInBytes);
+
+// Write numElements and clear out null bits to header
+Platform.putLong(holder.buffer, startingOffset, numElements);
+for (int i = 8; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+
+// fill 0 into reminder part of 8-bytes alignment in unsafe array
+for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
+  Platform.putByte(holder.buffer, startingOffset + headerInBytes + i, 
(byte) 0);
+}
+holder.cursor += (headerInBytes + fixedPartInBytes);
+  }
+
+  private void zeroOutPaddingBytes(int numBytes) {
+if ((numBytes & 0x07) > 0) {
+  Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 
3), 0L);
+}
+  }
+
+  private long getElementOffset(int ordinal, int elementSize) {
+return startingOffset + headerInBytes + ordinal * elementSize;
+  }
+
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
+assertIndexIsValid(ordinal);
+final long relativeOffset = currentCursor - startingOffset;
+final long offsetAndSize = (relativeOffset << 32) | size;
 
-// Grows the global buffer ahead for fixed size data.
-holder.grow(fixedElementSize * numElements);
+write(ordinal, offsetAndSize);
   }
 
-  private long getElementOffset(int ordinal) {
-return startingOffset + 4 + 4 * ordinal;
+  private void setNullBit(int ordinal) {
+assertIndexIsValid(ordinal);
+BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
   }
 
-  public void setNullAt(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-// Writes negative offset value to represent null element.
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
-relativeOffset);
+  public void setNullBoolean(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), 
false);
   }
 
-  public void setOffset(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
relativeOffset);
+  public void setNullByte(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
   }
 
+  public void setNullShort(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), 
(short)0);
+  }
+
+  public void setNullInt(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0);
+  }
+
+  public void setNullLong(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79653637
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -43,10 +43,11 @@
  *
  * In the `values or offset` region, we store the content of 
elements. For fields that hold
  * fixed-length primitive types, such as long, double, or int, we store 
the value directly
- * in the field. For fixed-length portion, each is word-aligned. For 
fields with non-primitive or
- * variable-length values, we store a relative offset (w.r.t. the base 
address of the array)
- * that points to the beginning of the variable-length field and length 
(they are combined into
- * a long). For variable length portion, each is aligned to 8-byte 
boundaries.
+ * in the field. For fixed-length portion, each element value (even for 
byte) is word-aligned.
--- End diff --

ok it's not confusing now but wrong... not each element value, but the 
whole region is word-aligned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79614500
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -199,6 +200,7 @@ public void write(int ordinal, Decimal input, int 
precision, int scale) {
 Platform.copyMemory(
   bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, 
numBytes);
 write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | 
((long) numBytes));
+setOffsetAndSize(ordinal, holder.cursor, (long)numBytes);
--- End diff --

are you writing the offset and region twice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79614390
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -141,12 +141,18 @@ class UnsafeArraySuite extends SparkFunSuite {
   }
 }
 
-val unsafeInterval = 
ExpressionEncoder[Array[CalendarInterval]].resolveAndBind().
-  toRow(calenderintervalArray).getArray(0)
-assert(unsafeInterval.isInstanceOf[UnsafeArrayData])
-assert(unsafeInterval.numElements == calenderintervalArray.length)
-calenderintervalArray.zipWithIndex.map { case (e, i) =>
-  assert(unsafeInterval.getInterval(i) == e)
+Seq(calenderintervalArray).map { calendarArray =>
--- End diff --

why `Seq`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79614191
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java
 ---
@@ -67,9 +67,9 @@ public UnsafeMapData() {
   public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) 
{
 // Read the numBytes of key array from the first 8 bytes.
 final long keyArraySize = Platform.getLong(baseObject, baseOffset);
-final long valueArraySize = sizeInBytes - keyArraySize - 8;
 assert keyArraySize >= 0 : "keyArraySize (" + keyArraySize + ") should 
>= 0";
 assert keyArraySize <= Integer.MAX_VALUE : "keyArraySize (" + 
keyArraySize + ") should <= Integer.MAX_VALUE";
+final long valueArraySize = sizeInBytes - (int)keyArraySize - 8;
--- End diff --

we can declare it as int directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79613964
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -38,15 +38,15 @@
  *
  * The `numElements` is 8 bytes storing the number of elements of this 
array.
  *
- * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
- * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte boundaries.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether an element is null
+ * Its total size is ceil(numElements / 8) bytes, and it is aligned to 
8-byte boundaries.
  *
  * In the `values or offset` region, we store the content of 
elements. For fields that hold
  * fixed-length primitive types, such as long, double, or int, we store 
the value directly
- * in the field. For fields with non-primitive or variable-length values, 
we store a relative
- * offset (w.r.t. the base address of the array) that points to the 
beginning of
- * the variable-length field and length (they are combined into a long). 
For variable length
- * portion, each is aligned to 8-byte boundaries.
+ * in the field. For fixed-length portion, each is word-aligned. For 
fields with non-primitive or
--- End diff --

`each is word-aligned` this is confusing, sounds like each element 
value(even for byte) is word-aligned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79566920
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+/* 4 + 4 * count // Use this expression for SPARK-15962 */
+UnsafeArrayData.calculateHeaderPortionInBytes(count)
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleUnsafeArray = 
doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.0
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intTotalLength: Int = 0
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+intTotalLength += 
intEncoder.toRow(intPrimitiveArray).getArray(0).numElements()
+n += 1
+  }
+}
+
+var doubleTotalLength: Int = 0
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79560822
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +328,123 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
+boolean[] values = new boolean[size];
+Platform.copyMemory(
+  baseObject, elementOffset, values, Platform.BOOLEAN_ARRAY_OFFSET, 
size);
+return values;
+  }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 4 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+  @Override
+  public byte[] toByteArray() {
+int size = numElements();
+byte[] values = new byte[size];
+Platform.copyMemory(
+  baseObject, elementOffset, values, Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
+  @Override
+  public short[] toShortArray() {
+if (numElements > Integer.MAX_VALUE) {
+  throw new UnsupportedOperationException("Cannot convert this unsafe 
array to array as " +
+  "it's too big.");
+}
+int size = (int)numElements;
+short[] values = new short[size];
+Platform.copyMemory(
+  baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, size 
* 2);
+return values;
+  }
 
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 4;
+  @Override
+  public int[] toIntArray() {
+int size = numElements();
+int[] values = new int[size];
+Platform.copyMemory(
+  baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, size * 
4);
+return values;
+  }
+
+  @Override
+  public long[] toLongArray() {
+if (numElements > Integer.MAX_VALUE) {
+  throw new UnsupportedOperationException("Cannot convert this unsafe 
array to array as " +
+  "it's too big.");
 }
+int size = (int)numElements;
+long[] values = new long[size];
+Platform.copyMemory(
+  baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, size 
* 8);
+return values;
+  }
 
-Platform.copyMemory(arr, Platform.INT_ARRAY_OFFSET, data,
-  Platform.BYTE_ARRAY_OFFSET + 4 + offsetRegionSize, valueRegionSize);
+  @Override
+  public float[] toFloatArray() {
+int size = numElements();
+float[] values = new float[size];
+Platform.copyMemory(
+  baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, size 
* 4);
+return values;
+  }
 
-UnsafeArrayData result = new UnsafeArrayData();
-result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, totalSize);
-return result;
+  @Override
+  public double[] toDoubleArray() {
+int size = numElements();
+double[] values = new double[size];
+Platform.copyMemory(
+  baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, 
size * 8);
+return values;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 12) {
+  private static UnsafeArrayData fromPrimitiveArray(
+   Object arr, int offset, int length, int elementSize) {
+final long headerInBytes = calculateHeaderPortionInBytes(length);
+final long valueRegionInBytes = elementSize * length;
+final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) 
/ 8;
+if (totalSizeInLongs * 8 > Integer.MAX_VALUE) {
--- End diff --

how about `totalSizeInLongs > Integer.MAX_VALUE / 8`? `totalSizeInLongs * 
8` may overflow


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79563496
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,134 +36,213 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 8 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+int fixedPartInBytes = ((elementSize * numElements + 7) / 8) * 8;
+holder.grow(headerInBytes + fixedPartInBytes);
+
+// Write numElements and clear out null bits to header
+Platform.putLong(holder.buffer, startingOffset, numElements);
+for (int i = 8; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+
+// fill 0 into reminder part of 8-bytes alignment in unsafe array
+if ((fixedPartInBytes - elementSize * numElements) != 0) {
+  for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
+Platform.putByte(holder.buffer, startingOffset + headerInBytes + 
i, (byte) 0);
+  }
+}
+holder.cursor += (headerInBytes + fixedPartInBytes);
+  }
+
+  private void zeroOutPaddingBytes(int numBytes) {
+if ((numBytes & 0x07) > 0) {
+  Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 
3), 0L);
+}
+  }
+
+  private long getElementOffset(int ordinal, int elementSize) {
+return startingOffset + headerInBytes + ordinal * elementSize;
+  }
+
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
+final long relativeOffset = currentCursor - startingOffset;
+final long offsetAndSize = (relativeOffset << 32) | size;
 
-// Grows the global buffer ahead for fixed size data.
-holder.grow(fixedElementSize * numElements);
+write(ordinal, offsetAndSize);
   }
 
-  private long getElementOffset(int ordinal) {
-return startingOffset + 4 + 4 * ordinal;
+  private void setNullBit(int ordinal) {
+assertIndexIsValid(ordinal);
+BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
   }
 
-  public void setNullAt(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-// Writes negative offset value to represent null element.
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
-relativeOffset);
+  public void setNullBoolean(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), 
false);
   }
 
-  public void setOffset(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
relativeOffset);
+  public void setNullByte(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
   }
 
+  public void setNullShort(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), 
(short)0);
+  }
+
+  public void setNullInt(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0);
+  }
+
+  public void setNullLong(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79561292
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java
 ---
@@ -65,14 +65,15 @@ public UnsafeMapData() {
* @param sizeInBytes the size of this map's backing data, in bytes
*/
   public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) 
{
-// Read the numBytes of key array from the first 4 bytes.
-final int keyArraySize = Platform.getInt(baseObject, baseOffset);
-final int valueArraySize = sizeInBytes - keyArraySize - 4;
+// Read the numBytes of key array from the first 8 bytes.
+final long keyArraySize = Platform.getLong(baseObject, baseOffset);
+final long valueArraySize = sizeInBytes - keyArraySize - 8;
--- End diff --

check the `keyArraySize` first, then we can just write
`final int valueArraySize = sizeInBytes - (int) keyArraySize - 8;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79564513
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -101,6 +101,8 @@ object ScalaReflection extends ScalaReflection {
   case t if t <:< definitions.ShortTpe => classOf[Array[Short]]
--- End diff --

do we still need to change this file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79566721
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+/* 4 + 4 * count // Use this expression for SPARK-15962 */
+UnsafeArrayData.calculateHeaderPortionInBytes(count)
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleUnsafeArray = 
doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.0
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intTotalLength: Int = 0
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val writeIntArray = { i: Int =>
--- End diff --

let's move `intTotalLength` into this closure, or it may hurt performance


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79562241
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,134 +36,213 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 8 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+int fixedPartInBytes = ((elementSize * numElements + 7) / 8) * 8;
+holder.grow(headerInBytes + fixedPartInBytes);
+
+// Write numElements and clear out null bits to header
+Platform.putLong(holder.buffer, startingOffset, numElements);
+for (int i = 8; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+
+// fill 0 into reminder part of 8-bytes alignment in unsafe array
+if ((fixedPartInBytes - elementSize * numElements) != 0) {
--- End diff --

the `if` is not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79566797
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+/* 4 + 4 * count // Use this expression for SPARK-15962 */
+UnsafeArrayData.calculateHeaderPortionInBytes(count)
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleUnsafeArray = 
doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.0
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intTotalLength: Int = 0
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+intTotalLength += 
intEncoder.toRow(intPrimitiveArray).getArray(0).numElements()
+n += 1
+  }
+}
+
+var doubleTotalLength: Int = 0
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79561758
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,55 +25,58 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each array has four parts:
+ *   [numElements][null bits][values or offset][variable length 
portion]
  *
- * The `numElements` is 4 bytes storing the number of elements of this 
array.
+ * The `numElements` is 8 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of 
elements. For fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
--- End diff --

we should also mention that fixed-length values portion is also word-aligned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79565280
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,187 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+  val stringArray = Array("1", "10", "100")
+  val dateArray = Array(
--- End diff --

@davies , I don't think we should test date and timestamp, it's same with 
int and long for `UnsafeArrayWriter`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79566935
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+/* 4 + 4 * count // Use this expression for SPARK-15962 */
+UnsafeArrayData.calculateHeaderPortionInBytes(count)
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleUnsafeArray = 
doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.0
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intTotalLength: Int = 0
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+intTotalLength += 
intEncoder.toRow(intPrimitiveArray).getArray(0).numElements()
+n += 1
+  }
+}
+
+var doubleTotalLength: Int = 0
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79560048
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +328,123 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
+boolean[] values = new boolean[size];
+Platform.copyMemory(
+  baseObject, elementOffset, values, Platform.BOOLEAN_ARRAY_OFFSET, 
size);
+return values;
+  }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 4 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+  @Override
+  public byte[] toByteArray() {
+int size = numElements();
+byte[] values = new byte[size];
+Platform.copyMemory(
+  baseObject, elementOffset, values, Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
+  @Override
+  public short[] toShortArray() {
+if (numElements > Integer.MAX_VALUE) {
--- End diff --

the `numElements` field is int type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79559789
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +328,123 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
--- End diff --

we already have `numElements` field, no need to create a new variable to 
hold it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79561024
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java
 ---
@@ -65,14 +65,15 @@ public UnsafeMapData() {
* @param sizeInBytes the size of this map's backing data, in bytes
*/
   public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) 
{
-// Read the numBytes of key array from the first 4 bytes.
-final int keyArraySize = Platform.getInt(baseObject, baseOffset);
-final int valueArraySize = sizeInBytes - keyArraySize - 4;
+// Read the numBytes of key array from the first 8 bytes.
--- End diff --

we should also update the class doc of `UnsafeMapData` to reflect this 
change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79566736
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+/* 4 + 4 * count // Use this expression for SPARK-15962 */
+UnsafeArrayData.calculateHeaderPortionInBytes(count)
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleUnsafeArray = 
doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.0
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intTotalLength: Int = 0
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+intTotalLength += 
intEncoder.toRow(intPrimitiveArray).getArray(0).numElements()
+n += 1
+  }
+}
+
+var doubleTotalLength: Int = 0
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79559615
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,55 +25,58 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each array has four parts:
+ *   [numElements][null bits][values or offset][variable length 
portion]
  *
- * The `numElements` is 4 bytes storing the number of elements of this 
array.
+ * The `numElements` is 8 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte boundaries.
--- End diff --

nit: `...and  it is...`  there is an extra space between `and `and `it`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79559429
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,55 +25,58 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each array has four parts:
+ *   [numElements][null bits][values or offset][variable length 
portion]
  *
- * The `numElements` is 4 bytes storing the number of elements of this 
array.
+ * The `numElements` is 8 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
--- End diff --

`an element is null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79566769
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+/* 4 + 4 * count // Use this expression for SPARK-15962 */
+UnsafeArrayData.calculateHeaderPortionInBytes(count)
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleUnsafeArray = 
doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.0
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intTotalLength: Int = 0
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+intTotalLength += 
intEncoder.toRow(intPrimitiveArray).getArray(0).numElements()
+n += 1
+  }
+}
+
+var doubleTotalLength: Int = 0
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79563210
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,134 +36,213 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 8 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+int fixedPartInBytes = ((elementSize * numElements + 7) / 8) * 8;
+holder.grow(headerInBytes + fixedPartInBytes);
+
+// Write numElements and clear out null bits to header
+Platform.putLong(holder.buffer, startingOffset, numElements);
+for (int i = 8; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+
+// fill 0 into reminder part of 8-bytes alignment in unsafe array
+if ((fixedPartInBytes - elementSize * numElements) != 0) {
+  for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
+Platform.putByte(holder.buffer, startingOffset + headerInBytes + 
i, (byte) 0);
+  }
+}
+holder.cursor += (headerInBytes + fixedPartInBytes);
+  }
+
+  private void zeroOutPaddingBytes(int numBytes) {
+if ((numBytes & 0x07) > 0) {
+  Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 
3), 0L);
+}
+  }
+
+  private long getElementOffset(int ordinal, int elementSize) {
+return startingOffset + headerInBytes + ordinal * elementSize;
+  }
+
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
+final long relativeOffset = currentCursor - startingOffset;
+final long offsetAndSize = (relativeOffset << 32) | size;
 
-// Grows the global buffer ahead for fixed size data.
-holder.grow(fixedElementSize * numElements);
+write(ordinal, offsetAndSize);
   }
 
-  private long getElementOffset(int ordinal) {
-return startingOffset + 4 + 4 * ordinal;
+  private void setNullBit(int ordinal) {
+assertIndexIsValid(ordinal);
+BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
   }
 
-  public void setNullAt(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-// Writes negative offset value to represent null element.
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
-relativeOffset);
+  public void setNullBoolean(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), 
false);
   }
 
-  public void setOffset(int ordinal) {
-final int relativeOffset = holder.cursor - startingOffset;
-Platform.putInt(holder.buffer, getElementOffset(ordinal), 
relativeOffset);
+  public void setNullByte(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
   }
 
+  public void setNullShort(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), 
(short)0);
+  }
+
+  public void setNullInt(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0);
+  }
+
+  public void setNullLong(int ordinal) {
+setNullBit(ordinal);
+// put zero into the corresponding field when set null
+

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79562772
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,134 +36,213 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 8 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+int fixedPartInBytes = ((elementSize * numElements + 7) / 8) * 8;
+holder.grow(headerInBytes + fixedPartInBytes);
+
+// Write numElements and clear out null bits to header
+Platform.putLong(holder.buffer, startingOffset, numElements);
+for (int i = 8; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+
+// fill 0 into reminder part of 8-bytes alignment in unsafe array
+if ((fixedPartInBytes - elementSize * numElements) != 0) {
+  for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
+Platform.putByte(holder.buffer, startingOffset + headerInBytes + 
i, (byte) 0);
+  }
+}
+holder.cursor += (headerInBytes + fixedPartInBytes);
+  }
+
+  private void zeroOutPaddingBytes(int numBytes) {
+if ((numBytes & 0x07) > 0) {
+  Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 
3), 0L);
+}
+  }
+
+  private long getElementOffset(int ordinal, int elementSize) {
+return startingOffset + headerInBytes + ordinal * elementSize;
+  }
+
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
--- End diff --

similar to `setNullBit`, should we also call `assertIndexIsValid(ordinal)` 
in this method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-09-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r79565586
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,187 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+  val stringArray = Array("1", "10", "100")
+  val dateArray = Array(
+DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1")).get,
+DateTimeUtils.stringToDate(UTF8String.fromString("2016-7-26")).get)
+  val timestampArray = Array(
+DateTimeUtils.stringToTimestamp(UTF8String.fromString("1970-1-1 
00:00:00")).get,
+DateTimeUtils.stringToTimestamp(UTF8String.fromString("2016-7-26 
00:00:00")).get)
+  val decimalArray4_1 = Array(
+BigDecimal("123.4").setScale(1, BigDecimal.RoundingMode.FLOOR),
+BigDecimal("567.8").setScale(1, BigDecimal.RoundingMode.FLOOR))
+  val decimalArray20_20 = Array(
+BigDecimal("1.2345678901234567890123456").setScale(21, 
BigDecimal.RoundingMode.FLOOR),
+BigDecimal("2.3456789012345678901234567").setScale(21, 
BigDecimal.RoundingMode.FLOOR))
+
+  val calenderintervalArray = Array(new CalendarInterval(3, 321), new 
CalendarInterval(1, 123))
+
+  val intMultiDimArray = Array(Array(1), Array(2, 20), Array(3, 30, 300))
+  val doubleMultiDimArray = Array(
+Array(1.1, 11.1), Array(2.2, 22.2, 222.2), Array(3.3, 33.3, 333.3, 
.3))
+
+  test("read array") {
+val unsafeBoolean = ExpressionEncoder[Array[Boolean]].resolveAndBind().
+  toRow(booleanArray).getArray(0)
+assert(unsafeBoolean.isInstanceOf[UnsafeArrayData])
+assert(unsafeBoolean.numElements == booleanArray.length)
+booleanArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeBoolean.getBoolean(i) == e)
+}
+
+val unsafeShort = ExpressionEncoder[Array[Short]].resolveAndBind().
+  toRow(shortArray).getArray(0)
+assert(unsafeShort.isInstanceOf[UnsafeArrayData])
+assert(unsafeShort.numElements == shortArray.length)
+shortArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeShort.getShort(i) == e)
+}
+
+val unsafeInt = ExpressionEncoder[Array[Int]].resolveAndBind().
+  toRow(intArray).getArray(0)
+assert(unsafeInt.isInstanceOf[UnsafeArrayData])
+assert(unsafeInt.numElements == intArray.length)
+intArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeInt.getInt(i) == e)
+}
+
+val unsafeLong = ExpressionEncoder[Array[Long]].resolveAndBind().
+  toRow(longArray).getArray(0)
+assert(unsafeLong.isInstanceOf[UnsafeArrayData])
+assert(unsafeLong.numElements == longArray.length)
+longArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeLong.getLong(i) == e)
+}
+
+val unsafeFloat = ExpressionEncoder[Array[Float]].resolveAndBind().
+  toRow(floatArray).getArray(0)
+assert(unsafeFloat.isInstanceOf[UnsafeArrayData])
+assert(unsafeFloat.numElements == floatArray.length)
+floatArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeFloat.getFloat(i) == e)
+}
+
+val unsafeDouble = ExpressionEncoder[Array[Double]].resolveAndBind().
+  toRow(doubleArray).getArray(0)
+assert(unsafeDouble.isInstanceOf[UnsafeArrayData])
+assert(unsafeDouble.numElements == doubleArray.length)
+doubleArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeDouble.getDouble(i) == e)
+}
+
+val unsafeString = ExpressionEncoder[Array[String]].resolveAndBind().
+  toRow(stringArray).getArray(0)
+assert(unsafeString.isInstanceOf[UnsafeArrayData])
+

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r74881945
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,55 +25,57 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each array has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
- * The `numElements` is 4 bytes storing the number of elements of this 
array.
+ * The `numElements` is 8 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. It can only be calculated by 
knowing the total bytes of
--- End diff --

i see, makes sense, thanks for the explanation!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-15 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r74798477
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,55 +25,57 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each array has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
- * The `numElements` is 4 bytes storing the number of elements of this 
array.
+ * The `numElements` is 8 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. It can only be calculated by 
knowing the total bytes of
--- End diff --

@cloud-fan It only handle that cases that the length is not aligned to 
word, but not handle that the cases that offset is not aligned to word, 
otherwise Platform.getLong() could be expensive.

The new format does not perform worse then before, but since we are 
touching this part, it's better to get it good now, rather than re-design the 
format in future again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r74175381
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,131 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.unsafe.Platform
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+  val stringArray = Array("1", "10", "100")
--- End diff --

Thanks. It worked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r74065336
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,131 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.unsafe.Platform
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+  val stringArray = Array("1", "10", "100")
--- End diff --

we can use `RowEncoder` for this case:
```
val schema = new StructType().add("array", ArrayType(DecimalType(20, 10)))
val encoder = RowEncoder(schema).resolveAndBind()
val externalRow = Row(new GenericArrayData(Array(Decimal("23213.131231"
val unsafeDecimalArray = encoder.toRow(externalRow)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r74038625
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,131 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.unsafe.Platform
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+  val stringArray = Array("1", "10", "100")
--- End diff --

@davies thanks. I understand what I should do. While I specified the scheme 
as follows, the generated code still uses 38 and 18.
When I checked code generation, [this 
code](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala#L300)
 gets data type from serializer instead of schema. If I am correct, [this 
code](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L61)
 generates a serializer in `ExpressionEncoder[T]` based on `[T]`, not a schema.
When I replaced 
['DecimalType.SYSTEM_DEFAULT'](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L511
) with 'DecimalType(4,1)', the generated code uses 4 and 1.

Would it be possible to let me know how to specify specific dataType by 
using schema?

```
  val decimalArray = Array(BigDecimal("123").setScale(1, 
BigDecimal.RoundingMode.FLOOR))
  test("read array") {
val unsafeDecimal = ExpressionEncoder[Array[BigDecimal]].copy(schema = 
new StructType()
.add("value", ArrayType(DataTypes.createDecimalType(4, 1), true), 
true), true)
.resolveAndBind().toRow(decimalArray).getArray(0)
decimalArray.zipWithIndex.map { case (e, i) =>
  assert(unsafeDecimal.getDecimal(i, e.precision, e.scale) == e)
}
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73979120
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,55 +25,57 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each array has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
- * The `numElements` is 4 bytes storing the number of elements of this 
array.
+ * The `numElements` is 8 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. It can only be calculated by 
knowing the total bytes of
--- End diff --

I checked the code, `UTF8String.equals` will call 
`ByteArrayMethods.arrayEquals`:
```
  public static boolean arrayEquals(
  Object leftBase, long leftOffset, Object rightBase, long rightOffset, 
final long length) {
int i = 0;
while (i <= length - 8) {
  if (Platform.getLong(leftBase, leftOffset + i) !=
Platform.getLong(rightBase, rightOffset + i)) {
return false;
  }
  i += 8;
}
while (i < length) {
  if (Platform.getByte(leftBase, leftOffset + i) !=
Platform.getByte(rightBase, rightOffset + i)) {
return false;
  }
  i += 1;
}
return true;
  }
```
It handles the case of un-aligned, I think word-alignment is only about 
performance here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73951262
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,131 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.unsafe.Platform
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+  val stringArray = Array("1", "10", "100")
--- End diff --

When we infer the schema from given BigDecimal class, we use SYSTEM_DEFAUT, 
because we don't know the actual precision and scale. Could you specify the 
schema in the test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-08 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73949824
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,131 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.unsafe.Platform
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+  val stringArray = Array("1", "10", "100")
--- End diff --

@davies could you favor me? To be honest, I am not familiar with 
implementation of `Decimal`. Could you please let me know how I am wrong?

I wrote the following test suite and got the generated code for projection. 
In generated code, lines 90 and 91 always uses 38 and 18 for precision and 
scale while I pass different ```BigDecimal``` in the test suite. These code are 
generated at 
[here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala#L218)
 and 
[here](https://github.com/apache/spark/blob/23c58653f900bfb71ef2b3186a95ad2562c33969/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala).
 I suspect that these places refers to 
[DecimalType.SYSTEM_DEFAULT](https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala#L110).
 `DecimalType.SYSTEM_DEFAULT` uses 38 and 18.
Should we use always these values (38 and 18)? Would it be possible to let 
us know what are other test cases that use ```DecimalType``` in 
```UnsafeArrayData```?

```java
  val decimalArray = Array(BigDecimal("123").setScale(1, 
BigDecimal.RoundingMode.FLOOR))
  test("read array") {
val unsafeDecimal = 
ExpressionEncoder[Array[BigDecimal]].resolveAndBind().
  toRow(decimalArray).getArray(0)
assert(unsafeDecimal.isInstanceOf[UnsafeArrayData])
assert(unsafeDecimal.numElements == decimalArray.length)
decimalArray.zipWithIndex.map { case (e, i) =>
  print(s"e: $e, ${e.precision}, ${e.scale}\n")
  assert(unsafeDecimal.getDecimal(i, e.precision, e.scale) == e)
}
  }
```
```java
/* 031 */   public UnsafeRow apply(InternalRow i) {
/* 032 */ holder.reset();
/* 033 */
/* 034 */ rowWriter.zeroOutNullBytes();
/* 035 */
/* 036 */
/* 037 */ boolean isNull1 = i.isNullAt(0);
/* 038 */ scala.math.BigDecimal[] value1 = isNull1 ? null : 
((scala.math.BigDecimal[])i.get(0, null));
/* 039 */ ArrayData value = null;
/* 040 */
/* 041 */ if (!isNull1) {
/* 042 */
/* 043 */   Decimal[] convertedArray = null;
/* 044 */   int dataLength = value1.length;
/* 045 */   convertedArray = new Decimal[dataLength];
/* 046 */
/* 047 */   int loopIndex = 0;
/* 048 */   while (loopIndex < dataLength) {
/* 049 */ MapObjects_loopValue0 = (scala.math.BigDecimal) 
(value1[loopIndex]);
/* 050 */ MapObjects_loopIsNull1 = MapObjects_loopValue0 == null;
/* 051 */
/* 052 */
/* 053 */ boolean isNull2 = MapObjects_loopIsNull1;
/* 054 */ final Decimal value2 = isNull2 ? null : 
org.apache.spark.sql.types.Decimal.apply(MapObjects_loopValue0);
/* 055 */ isNull2 = value2 == null;
/* 056 */ if (isNull2) {
/* 057 */   convertedArray[loopIndex] = null;
/* 058 */ } else {
/* 059 */   convertedArray[loopIndex] = value2;
/* 060 */ }
/* 061 */
/* 062 */ loopIndex += 1;
/* 063 */   }
/* 064 */
/* 065 */   value = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray);
/* 066 */ }
/* 067 */ if (isNull1) {
/* 068 */   rowWriter.setNullAt(0);
/* 069 */ } else {
/* 070 */   // Remember the current cursor so that we can calculate how 
many bytes are
/* 071 */   // written 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73926983
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,55 +25,57 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each array has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
- * The `numElements` is 4 bytes storing the number of elements of this 
array.
+ * The `numElements` is 8 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. It can only be calculated by 
knowing the total bytes of
--- End diff --

Since the length of UTF8String can't be aligned to word, and we are using 
getLong() to compare two UTF8String, we should make them aligned to word in 
array, I think, other wise we should copy the bytes in getUTF8String(). cc 
@rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73814104
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,55 +25,57 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each array has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
- * The `numElements` is 4 bytes storing the number of elements of this 
array.
+ * The `numElements` is 8 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. It can only be calculated by 
knowing the total bytes of
--- End diff --

@davies, are you saying that we should make each variable-length element in 
the `variable length portion` word-aligned? Then we need to follow `UnsafeRow` 
and store both offset and length in `offset region`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-06 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73795810
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -101,6 +101,8 @@ object ScalaReflection extends ScalaReflection {
   case t if t <:< definitions.ShortTpe => classOf[Array[Short]]
   case t if t <:< definitions.ByteTpe => classOf[Array[Byte]]
   case t if t <:< definitions.BooleanTpe => classOf[Array[Boolean]]
+  case t if t <:< localTypeOf[CalendarInterval] => 
classOf[Array[CalendarInterval]]
+  case t if t <:< localTypeOf[Decimal] => classOf[Array[Decimal]]
--- End diff --

When I added test cases for `CalendarInterval` and `Decimal`, I got the 
following cast exception without these changes. What is an appropriate way to 
fix this?

```java
org.apache.spark.sql.types.CalendarIntervalType$ cannot be cast to 
org.apache.spark.sql.types.ObjectType
java.lang.ClassCastException: 
org.apache.spark.sql.types.CalendarIntervalType$ cannot be cast to 
org.apache.spark.sql.types.ObjectType
at 
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$arrayClassFor(ScalaReflection.scala:108)
at 
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:82)
at 
org.apache.spark.sql.catalyst.ScalaReflection$.dataTypeFor(ScalaReflection.scala:63)
at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:53)
at 
org.apache.spark.sql.catalyst.util.UnsafeArraySuite$$anonfun$1.apply$mcV$sp(UnsafeArraySuite.scala:129)
at 
org.apache.spark.sql.catalyst.util.UnsafeArraySuite$$anonfun$1.apply(UnsafeArraySuite.scala:48)
at 
org.apache.spark.sql.catalyst.util.UnsafeArraySuite$$anonfun$1.apply(UnsafeArraySuite.scala:48)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:57)
at 
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at 
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at 
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
at org.scalatest.Suite$class.run(Suite.scala:1424)
at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at 
org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:29)
at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
at 
org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:29)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73779646
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -93,6 +102,38 @@ class UnsafeArraySuite extends SparkFunSuite {
   assert(unsafeString.getUTF8String(i).toString().equals(e))
 }
 
+val unsafeDate = ExpressionEncoder[Array[Int]].resolveAndBind().
+  toRow(dateArray).getArray(0)
+assert(unsafeDate.isInstanceOf[UnsafeArrayData])
+assert(unsafeDate.numElements == dateArray.length)
+dateArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeDate.get(i, DateType) == e)
+}
+
+val unsafeTimestamp = ExpressionEncoder[Array[Long]].resolveAndBind().
+  toRow(timestampArray).getArray(0)
+assert(unsafeTimestamp.isInstanceOf[UnsafeArrayData])
+assert(unsafeTimestamp.numElements == timestampArray.length)
+timestampArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeTimestamp.get(i, TimestampType) == e)
+}
+
+val unsafeDecimal = ExpressionEncoder[Array[Decimal]].resolveAndBind().
--- End diff --

the external type for decimal is `java.math.BigDecimal` or scala decimal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73779549
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,16 +343,20 @@ public UnsafeArrayData copy() {
 int size = numElements();
 byte[] values = new byte[size];
 Platform.copyMemory(
-  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+  baseObject, elementOffset, values, Platform.BYTE_ARRAY_OFFSET, size);
 return values;
   }
 
   @Override
   public short[] toShortArray() {
-int size = numElements();
+if (numElements > Integer.MAX_VALUE) {
--- End diff --

`numElements` is a int right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-05 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73776684
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
 ---
@@ -73,8 +73,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
 checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4)
 checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
 checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5)
-checkActualSize(ARRAY_TYPE, Array[Any](1), 16)
--- End diff --

It seem to have to keep `Any`. When I changed it to `Int`, I got the 
following cast error:

```java
[I cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
java.lang.ClassCastException: [I cannot be cast to 
org.apache.spark.sql.catalyst.util.ArrayData
at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:48)
at 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:236)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:34)
at 
org.apache.spark.sql.execution.columnar.ColumnTypeSuite$$anonfun$2.checkActualSize$1(ColumnTypeSuite.scala:60)
...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-05 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73753275
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
--- End diff --

Just change the format, not API


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-05 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73750991
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
--- End diff --

One question. Do we change return type of `numElements()` and 
`getSizeInBytes()` from `int` to `long`? Or, still keep as `int` after making 
`numElements` `long`?
To change these types requires a lot of rewriting of declarations from 
`int` to `long`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-05 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73745549
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. its length can be got by 
subtracting 2 adjacent offsets
--- End diff --

I think that  `intMultiDimArray` and `doubleMultiDimArray` in 
`UnsafeArraySuite` cover these corner cases. They keeps variable-length objects 
`Array` in `Array`. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-05 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73727756
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. its length can be got by 
subtracting 2 adjacent offsets
--- End diff --

Thinking of array of array of struct, if the length of array is not aligned 
to 8 bytes or the array do not make sure elements are aligned to 8 bytes, then 
the offset of struct can not aligned to 8 bytes, that break the assumption of 
UnsafeRow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-05 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73727463
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
--- End diff --

Thinking of array of array of struct, the offset of inner struct has to 
aligned to 8 bytes, otherwise it could have bad performance.

It's better to have consistent design, especially when we try to re-design 
this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73638020
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. its length can be got by 
subtracting 2 adjacent offsets
--- End diff --

`UnsafeArray` won't appear along, when we write it into `UnsafeRow`, it 
will be aligned to 8 bytes by the row writer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73627677
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
--- End diff --

The previous format isn't aligned either, I think it's ok because 
`UnsafeArray` won't appear along, but always as a field of `UnsafeRow`. And we 
do align the whole content of `UnsafeArray` when write it into `UnsafeRow`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73594310
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
--- End diff --

long sounds better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73585439
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
--- End diff --

I like this. Do we declare `numElements` as `long`? Or, should we use only 
4 bytes in 8 bytes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73569234
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. its length can be got by 
subtracting 2 adjacent offsets
--- End diff --

We should have uint tests to cover these corner cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73569132
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. its length can be got by 
subtracting 2 adjacent offsets
--- End diff --

If the total length of the array is aligned to 8-byte, we should have N+1 
offset in order to know the length of last element.

Another trick could be we store the end of each element instead of offset, 
because we could know the offset of the first element (elementOffset + 4 * n), 
it's a little bit confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73568470
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
--- End diff --

All the four parts should aligned to 8-bytes, and the total length of array 
should be aligned to 8 bytes, to make nested array/struct works better (they 
are assumed to have aligned address).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73567644
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -59,21 +65,16 @@
   // The 4-bytes header of `numElements` is also included.
   private int sizeInBytes;
 
-  public Object getBaseObject() { return baseObject; }
-  public long getBaseOffset() { return baseOffset; }
-  public int getSizeInBytes() { return sizeInBytes; }
+  /** The width of the null tracking bit set plus `numElements`, in bytes 
*/
+  private int headerInBytes;
--- End diff --

It's more often to use `baseOffset + headerInBytes`, we could have 
elementOffset instead of headerInBytes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73567215
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
  *
- * In the `offsets` region, we store 4 bytes per element, represents the 
relative offset (w.r.t. the
- * base address of the array) of this element in `values` region. We can 
get the length of this
- * element by subtracting next offset.
- * Note that offset can by negative which means this element is null.
+ * In the `null bits` region, we store 1 bit per element, represents 
whether a element has null
+ * Its total size is ceil(numElements / 8) bytes, and  it is aligned to 
8-byte word boundaries.
  *
- * In the `values` region, we store the content of elements. As we can get 
length info, so elements
- * can be variable-length.
+ * In the `values or offset` region, we store the content of elements. For 
fields that hold
+ * fixed-length primitive types, such as long, double, or int, we store 
the value directly
+ * in the field. For fields with non-primitive or variable-length values, 
we store a relative
+ * offset (w.r.t. the base address of the array) that points to the 
beginning of
+ * the variable-length field into int. its length can be got by 
subtracting 2 adjacent offsets
--- End diff --

For the length of last var-length object, it can only be calculated by know 
the total bytes of the array, could you also add a comment for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73566559
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
  *
  * The `numElements` is 4 bytes storing the number of elements of this 
array.
--- End diff --

If the element type is Long/Double, we should make sure that the elements 
are aligned to word, we it's better to use 8 bytes for numElements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73565969
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -25,30 +25,36 @@
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * An Unsafe implementation of Array which is backed by raw memory instead 
of Java objects.
  *
- * Each tuple has three parts: [numElements] [offsets] [values]
+ * Each tuple has four parts: [numElements][null bits][values or 
offset][variable length portion]
--- End diff --

Each array ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73565692
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,131 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.unsafe.Platform
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+  val stringArray = Array("1", "10", "100")
--- End diff --

Can we also have tests for DateType, TimestampType, DecimalType (different 
precisions)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-08-04 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r73565092
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
 ---
@@ -73,8 +73,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
 checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4)
 checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
 checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5)
-checkActualSize(ARRAY_TYPE, Array[Any](1), 16)
--- End diff --

Is this an array of int or long?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r70010656
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+/* 4 + 4 * count // Use this expression for SPARK-15962 */
+UnsafeArrayData.calculateHeaderPortionInBytes(count)
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleUnsafeArray = 
doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.0
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intTotalLength: Int = 0
+val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+intTotalLength += 
intEncoder.toRow(intPrimitiveArray).getArray(0).numElements()
+n += 1
+  }
+}
+
+var doubleTotalLength: Int = 0
+val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble 
}
+val doubleEncoder = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r70010528
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +327,115 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
+boolean[] values = new boolean[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BOOLEAN_ARRAY_OFFSET, size);
+return values;
+  }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 4 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+  @Override
+  public byte[] toByteArray() {
+int size = numElements();
+byte[] values = new byte[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
+  @Override
+  public short[] toShortArray() {
+int size = numElements();
+short[] values = new short[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.SHORT_ARRAY_OFFSET, size * 2);
+return values;
+  }
 
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 4;
-}
+  @Override
+  public int[] toIntArray() {
+int size = numElements();
+int[] values = new int[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.INT_ARRAY_OFFSET, size * 4);
+return values;
+  }
 
-Platform.copyMemory(arr, Platform.INT_ARRAY_OFFSET, data,
-  Platform.BYTE_ARRAY_OFFSET + 4 + offsetRegionSize, valueRegionSize);
+  @Override
+  public long[] toLongArray() {
+int size = numElements();
+long[] values = new long[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.LONG_ARRAY_OFFSET, size * 8);
+return values;
+  }
 
-UnsafeArrayData result = new UnsafeArrayData();
-result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, totalSize);
-return result;
+  @Override
+  public float[] toFloatArray() {
+int size = numElements();
+float[] values = new float[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.FLOAT_ARRAY_OFFSET, size * 4);
+return values;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 12) {
+  @Override
+  public double[] toDoubleArray() {
+int size = numElements();
+double[] values = new double[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.DOUBLE_ARRAY_OFFSET, size * 8);
+return values;
+  }
+
+  private static UnsafeArrayData fromPrimitiveArray(
+   Object arr, int offset, int length, int elementSize) {
+final int headerInBytes = calculateHeaderPortionInBytes(length);
+final int valueRegionInBytes = elementSize * length;
+final int totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) 
/ 8;
--- End diff --

Sorry I was wrong :(  We need to declare them all as long, in order to do 
overflow check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69927398
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intResult: Int = 0
+val intBuffer = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0.toInt
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+intResult = sum
+n += 1
+  }
+}
+
+var doubleResult: Double = 0
+val doubleBuffer = Array.fill[Double](count) { rand.nextDouble }
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
+val doubleUnsafeArray = doubleInternalRow.getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.toDouble
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+doubleResult = sum
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+val intUnsafeRow = new UnsafeRow(1)
+val intUnsafeArrayWriter = new UnsafeArrayWriter
--- End diff --

Got it. My interpretation was to use an `UnsafeArray` generated by 
`encoder.toRow(array)` for benchmark. I will update `writeUnsafeArray` to 
measure the elapsed time of `encoder.toRow(array)`


---
If your project is set up for it, you can reply to this email and have your

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850568
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intResult: Int = 0
+val intBuffer = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0.toInt
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+intResult = sum
+n += 1
+  }
+}
+
+var doubleResult: Double = 0
+val doubleBuffer = Array.fill[Double](count) { rand.nextDouble }
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
+val doubleUnsafeArray = doubleInternalRow.getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.toDouble
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+doubleResult = sum
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+val intUnsafeRow = new UnsafeRow(1)
+val intUnsafeArrayWriter = new UnsafeArrayWriter
+val intBufferHolder = new BufferHolder(intUnsafeRow, 64)
+intBufferHolder.reset()
+intUnsafeArrayWriter.initialize(intBufferHolder, count, 4)
+val intCursor = intBufferHolder.cursor
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850526
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intResult: Int = 0
+val intBuffer = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0.toInt
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+intResult = sum
+n += 1
+  }
+}
+
+var doubleResult: Double = 0
+val doubleBuffer = Array.fill[Double](count) { rand.nextDouble }
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
+val doubleUnsafeArray = doubleInternalRow.getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.toDouble
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+doubleResult = sum
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int279 /  294600.4 
  1.7   1.0X
+Double 296 /  303567.0 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+val intUnsafeRow = new UnsafeRow(1)
+val intUnsafeArrayWriter = new UnsafeArrayWriter
--- End diff --

have you seen my comment here? 
https://github.com/apache/spark/pull/13680/files#r69392823

testing the array writer is so low level and peoples are more interested in 
writing the whole array. If you take a look at what `encoder.toRow` does, it 
generates a 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850344
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intResult: Int = 0
+val intBuffer = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0.toInt
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+intResult = sum
--- End diff --

did you see a very different performance result without this assignment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850312
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intResult: Int = 0
+val intBuffer = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0.toInt
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+intResult = sum
+n += 1
+  }
+}
+
+var doubleResult: Double = 0
+val doubleBuffer = Array.fill[Double](count) { rand.nextDouble }
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
+val doubleUnsafeArray = doubleInternalRow.getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.toDouble
--- End diff --

`var sum = 0L`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850289
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intResult: Int = 0
+val intBuffer = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0.toInt
--- End diff --

unnecessary `toInt`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850272
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+val rand = new Random(42)
+
+var intResult: Int = 0
+val intBuffer = Array.fill[Int](count) { rand.nextInt }
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
--- End diff --

combine these 2 lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850225
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
--- End diff --

we can just return `UnsafeArrayData.calculateHeaderPortionInBytes(count)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850211
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  1. replace ignore(...) with test(...)
+ *  2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
--- End diff --

remove this comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850185
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,126 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.unsafe.Platform
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+
+  val intMultiDimArray = Array(Array(1, 10), Array(2, 20, 200), Array(3, 
30, 300, 3000))
+  val doubleMultiDimArray = Array(
+Array(1.1, 11.1), Array(2.2, 22.2, 222.2), Array(3.3, 33.3, 333.3, 
.3))
+
+  test("read array") {
+val unsafeBoolean = ExpressionEncoder[Array[Boolean]].resolveAndBind().
+  toRow(booleanArray).getArray(0)
+assert(unsafeBoolean.isInstanceOf[UnsafeArrayData])
+assert(unsafeBoolean.numElements == booleanArray.length)
+booleanArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeBoolean.getBoolean(i) == e)
+}
+
+val unsafeShort = ExpressionEncoder[Array[Short]].resolveAndBind().
+  toRow(shortArray).getArray(0)
+assert(unsafeShort.isInstanceOf[UnsafeArrayData])
+assert(unsafeShort.numElements == shortArray.length)
+shortArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeShort.getShort(i) == e)
+}
+
+val unsafeInt = ExpressionEncoder[Array[Int]].resolveAndBind().
+  toRow(intArray).getArray(0)
+assert(unsafeInt.isInstanceOf[UnsafeArrayData])
+assert(unsafeInt.numElements == intArray.length)
+intArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeInt.getInt(i) == e)
+}
+
+val unsafeLong = ExpressionEncoder[Array[Long]].resolveAndBind().
+  toRow(longArray).getArray(0)
+assert(unsafeLong.isInstanceOf[UnsafeArrayData])
+assert(unsafeLong.numElements == longArray.length)
+longArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeLong.getLong(i) == e)
+}
+
+val unsafeFloat = ExpressionEncoder[Array[Float]].resolveAndBind().
+  toRow(floatArray).getArray(0)
+assert(unsafeFloat.isInstanceOf[UnsafeArrayData])
+assert(unsafeFloat.numElements == floatArray.length)
+floatArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeFloat.getFloat(i) == e)
+}
+
+val unsafeDouble = ExpressionEncoder[Array[Double]].resolveAndBind().
+  toRow(doubleArray).getArray(0)
+assert(unsafeDouble.isInstanceOf[UnsafeArrayData])
+assert(unsafeDouble.numElements == doubleArray.length)
+doubleArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeDouble.getDouble(i) == e)
+}
+
+val unsafeMultiDimInt = 
ExpressionEncoder[Array[Array[Int]]].resolveAndBind().
+  toRow(intMultiDimArray).getArray(0)
+assert(unsafeMultiDimInt.isInstanceOf[UnsafeArrayData])
+assert(unsafeMultiDimInt.numElements == intMultiDimArray.length)
+intMultiDimArray.zipWithIndex.map { case (a, j) =>
+  val u = unsafeMultiDimInt.getArray(j)
+  assert(u.isInstanceOf[UnsafeArrayData])
+  assert(u.numElements == a.length)
+  a.zipWithIndex.map { case (e, i) =>
+assert(u.getInt(i) == e)
+  }
+}
+
+val unsafeMultiDimDouble = 
ExpressionEncoder[Array[Array[Double]]].resolveAndBind().
+  toRow(doubleMultiDimArray).getArray(0)
+assert(unsafeDouble.isInstanceOf[UnsafeArrayData])
+assert(unsafeMultiDimDouble.numElements == doubleMultiDimArray.length)
+doubleMultiDimArray.zipWithIndex.map { case (a, j) =>
+  val u = unsafeMultiDimDouble.getArray(j)
+  assert(u.isInstanceOf[UnsafeArrayData])
+  assert(u.numElements == a.length)
+  a.zipWithIndex.map { case (e, i) =>
+assert(u.getDouble(i) == e)
+  }
+}
+  }
+
+  test("from primitive array") {
+val unsafeInt = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850087
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,126 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.unsafe.Platform
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
--- End diff --

let's also test string array


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69850001
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,91 +38,144 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
-  public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
+  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+// We need 4 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+holder.grow(headerInBytes + elementSize * numElements);
+
+// Write numElements and clear out null bits to header
+Platform.putInt(holder.buffer, startingOffset, numElements);
+for (int i = 4; i < headerInBytes; i += 8) {
+  Platform.putLong(holder.buffer, startingOffset + i, 0L);
+}
+holder.cursor += (headerInBytes + elementSize * numElements);
+  }
+
+  private long getElementOffset(int ordinal, int elementSize) {
+return startingOffset + headerInBytes + ordinal * elementSize;
+  }
+
+  public void setOffset(int ordinal, int currentCursor) {
--- End diff --

do we always pass in `holder.cursor` as `currentCursor`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69849961
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -189,28 +189,29 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 
 val jt = ctx.javaType(et)
 
-val fixedElementSize = et match {
+val elementOrOffsetSize = et match {
   case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => 8
   case _ if ctx.isPrimitiveType(jt) => et.defaultSize
-  case _ => 0
+  case _ => 8  // we need 8 bytes to store offset and length for 
variable-length types
 }
 
+val tmpCursor = ctx.freshName("tmpCursor")
--- End diff --

it's never used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69849874
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -189,28 +189,29 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 
 val jt = ctx.javaType(et)
 
-val fixedElementSize = et match {
+val elementOrOffsetSize = et match {
   case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => 8
   case _ if ctx.isPrimitiveType(jt) => et.defaultSize
-  case _ => 0
+  case _ => 8  // we need 8 bytes to store offset and length for 
variable-length types
--- End diff --

It should be 4 now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69849567
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -19,9 +19,14 @@
 
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
+import java.util.Arrays;
--- End diff --

it's still here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69849512
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +328,115 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
+boolean[] values = new boolean[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BOOLEAN_ARRAY_OFFSET, size);
+return values;
+  }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 4 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+  @Override
+  public byte[] toByteArray() {
+int size = numElements();
+byte[] values = new byte[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
+  @Override
+  public short[] toShortArray() {
+int size = numElements();
+short[] values = new short[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.SHORT_ARRAY_OFFSET, size * 2);
+return values;
+  }
 
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 4;
-}
+  @Override
+  public int[] toIntArray() {
+int size = numElements();
+int[] values = new int[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.INT_ARRAY_OFFSET, size * 4);
+return values;
+  }
 
-Platform.copyMemory(arr, Platform.INT_ARRAY_OFFSET, data,
-  Platform.BYTE_ARRAY_OFFSET + 4 + offsetRegionSize, valueRegionSize);
+  @Override
+  public long[] toLongArray() {
+int size = numElements();
+long[] values = new long[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.LONG_ARRAY_OFFSET, size * 8);
+return values;
+  }
 
-UnsafeArrayData result = new UnsafeArrayData();
-result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, totalSize);
-return result;
+  @Override
+  public float[] toFloatArray() {
+int size = numElements();
+float[] values = new float[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.FLOAT_ARRAY_OFFSET, size * 4);
+return values;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 12) {
+  @Override
+  public double[] toDoubleArray() {
+int size = numElements();
+double[] values = new double[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.DOUBLE_ARRAY_OFFSET, size * 8);
+return values;
+  }
+
+  private static UnsafeArrayData fromPrimitiveArray(
+   Object arr, int offset, int length, int elementSize) {
+final long headerInBytes = calculateHeaderPortionInBytes(length);
+final long valueRegionInBytes = (long)elementSize * (long)length;
+final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) 
/ 8;
--- End diff --

we should declare `totalSizeInLongs` as int


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69849441
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +328,115 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
+boolean[] values = new boolean[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BOOLEAN_ARRAY_OFFSET, size);
+return values;
+  }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 4 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+  @Override
+  public byte[] toByteArray() {
+int size = numElements();
+byte[] values = new byte[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
+  @Override
+  public short[] toShortArray() {
+int size = numElements();
+short[] values = new short[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.SHORT_ARRAY_OFFSET, size * 2);
+return values;
+  }
 
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 4;
-}
+  @Override
+  public int[] toIntArray() {
+int size = numElements();
+int[] values = new int[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.INT_ARRAY_OFFSET, size * 4);
+return values;
+  }
 
-Platform.copyMemory(arr, Platform.INT_ARRAY_OFFSET, data,
-  Platform.BYTE_ARRAY_OFFSET + 4 + offsetRegionSize, valueRegionSize);
+  @Override
+  public long[] toLongArray() {
+int size = numElements();
+long[] values = new long[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.LONG_ARRAY_OFFSET, size * 8);
+return values;
+  }
 
-UnsafeArrayData result = new UnsafeArrayData();
-result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, totalSize);
-return result;
+  @Override
+  public float[] toFloatArray() {
+int size = numElements();
+float[] values = new float[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.FLOAT_ARRAY_OFFSET, size * 4);
+return values;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 12) {
+  @Override
+  public double[] toDoubleArray() {
+int size = numElements();
+double[] values = new double[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.DOUBLE_ARRAY_OFFSET, size * 8);
+return values;
+  }
+
+  private static UnsafeArrayData fromPrimitiveArray(
+   Object arr, int offset, int length, int elementSize) {
+final long headerInBytes = calculateHeaderPortionInBytes(length);
+final long valueRegionInBytes = (long)elementSize * (long)length;
+final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) 
/ 8;
+if (totalSizeInLongs * 8> Integer.MAX_VALUE) {
--- End diff --

nit: a space after `8`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69849225
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -237,62 +229,57 @@ public Decimal getDecimal(int ordinal, int precision, 
int scale) {
 
   @Override
   public UTF8String getUTF8String(int ordinal) {
-assertIndexIsValid(ordinal);
-final int offset = getElementOffset(ordinal);
-if (offset < 0) return null;
-final int size = getElementSize(offset, ordinal);
+if (isNullAt(ordinal)) return null;
+final int offset = getInt(ordinal);
+final int size = getSize(ordinal);
 return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
   }
 
   @Override
   public byte[] getBinary(int ordinal) {
-assertIndexIsValid(ordinal);
-final int offset = getElementOffset(ordinal);
-if (offset < 0) return null;
-final int size = getElementSize(offset, ordinal);
+if (isNullAt(ordinal)) return null;
+final int offset = getInt(ordinal);
+final int size = getSize(ordinal);
 final byte[] bytes = new byte[size];
 Platform.copyMemory(baseObject, baseOffset + offset, bytes, 
Platform.BYTE_ARRAY_OFFSET, size);
 return bytes;
   }
 
   @Override
   public CalendarInterval getInterval(int ordinal) {
-assertIndexIsValid(ordinal);
-final int offset = getElementOffset(ordinal);
-if (offset < 0) return null;
+if (isNullAt(ordinal)) return null;
+final long offsetAndSize = getLong(ordinal);
--- End diff --

it's unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483637
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+var intResult: Int = 0
+val intBuffer = new Array[Int](count)
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0.toInt
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+intResult = sum
+n += 1
+  }
+}
+
+var doubleResult: Double = 0
+val doubleBuffer = new Array[Double](count)
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
+val doubleUnsafeArray = doubleInternalRow.getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.toDouble
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+doubleResult = sum
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int281 /  296597.5 
  1.7   1.0X
+Double 298 /  301562.3 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+val intUnsafeRow = new UnsafeRow(1)
+val intUnsafeArrayWriter = new UnsafeArrayWriter
+val intBufferHolder = new BufferHolder(intUnsafeRow, 64)
+intBufferHolder.reset()
+intUnsafeArrayWriter.initialize(intBufferHolder, count, 4)
+val intCursor = intBufferHolder.cursor
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+intBufferHolder.cursor = intCursor
+val len = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483623
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+var intResult: Int = 0
+val intBuffer = new Array[Int](count)
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0.toInt
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+intResult = sum
+n += 1
+  }
+}
+
+var doubleResult: Double = 0
+val doubleBuffer = new Array[Double](count)
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
+val doubleUnsafeArray = doubleInternalRow.getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.toDouble
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+doubleResult = sum
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int281 /  296597.5 
  1.7   1.0X
+Double 298 /  301562.3 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+val intUnsafeRow = new UnsafeRow(1)
+val intUnsafeArrayWriter = new UnsafeArrayWriter
+val intBufferHolder = new BufferHolder(intUnsafeRow, 64)
+intBufferHolder.reset()
+intUnsafeArrayWriter.initialize(intBufferHolder, count, 4)
+val intCursor = intBufferHolder.cursor
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+intBufferHolder.cursor = intCursor
+val len = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483629
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+var intResult: Int = 0
+val intBuffer = new Array[Int](count)
+val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+val intInternalRow = intEncoder.toRow(intBuffer)
+val intUnsafeArray = intInternalRow.getArray(0)
+val readIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = intUnsafeArray.numElements
+var sum = 0.toInt
+var i = 0
+while (i < len) {
+  sum += intUnsafeArray.getInt(i)
+  i += 1
+}
+intResult = sum
+n += 1
+  }
+}
+
+var doubleResult: Double = 0
+val doubleBuffer = new Array[Double](count)
+val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
+val doubleUnsafeArray = doubleInternalRow.getArray(0)
+val readDoubleArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+val len = doubleUnsafeArray.numElements
+var sum = 0.toDouble
+var i = 0
+while (i < len) {
+  sum += doubleUnsafeArray.getDouble(i)
+  i += 1
+}
+doubleResult = sum
+n += 1
+  }
+}
+
+val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+benchmark.addCase("Int")(readIntArray)
+benchmark.addCase("Double")(readDoubleArray)
+benchmark.run
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
+Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
+
+Read UnsafeArrayData:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Int281 /  296597.5 
  1.7   1.0X
+Double 298 /  301562.3 
  1.8   0.9X
+*/
+  }
+
+  def writeUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+val intUnsafeRow = new UnsafeRow(1)
+val intUnsafeArrayWriter = new UnsafeArrayWriter
+val intBufferHolder = new BufferHolder(intUnsafeRow, 64)
+intBufferHolder.reset()
+intUnsafeArrayWriter.initialize(intBufferHolder, count, 4)
+val intCursor = intBufferHolder.cursor
+val writeIntArray = { i: Int =>
+  var n = 0
+  while (n < iters) {
+intBufferHolder.cursor = intCursor
+val len = 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483595
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
 ---
@@ -18,27 +18,110 @@
 package org.apache.spark.sql.catalyst.util
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
+import org.apache.spark.unsafe.Platform
 
 class UnsafeArraySuite extends SparkFunSuite {
 
-  test("from primitive int array") {
-val array = Array(1, 10, 100)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 4 * 3)
-assert(unsafe.getInt(0) == 1)
-assert(unsafe.getInt(1) == 10)
-assert(unsafe.getInt(2) == 100)
+  val booleanArray = Array(false, true)
+  val shortArray = Array(1.toShort, 10.toShort, 100.toShort)
+  val intArray = Array(1, 10, 100)
+  val longArray = Array(1.toLong, 10.toLong, 100.toLong)
+  val floatArray = Array(1.1.toFloat, 2.2.toFloat, 3.3.toFloat)
+  val doubleArray = Array(1.1, 2.2, 3.3)
+
+  test("read array") {
+val unsafeBoolean = ExpressionEncoder[Array[Boolean]].resolveAndBind().
+  toRow(booleanArray).getArray(0)
+assert(unsafeBoolean.isInstanceOf[UnsafeArrayData])
+assert(unsafeBoolean.numElements == 2)
+booleanArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeBoolean.getBoolean(i) == e)
+}
+
+val unsafeShort = ExpressionEncoder[Array[Short]].resolveAndBind().
+  toRow(shortArray).getArray(0)
+assert(unsafeShort.isInstanceOf[UnsafeArrayData])
+assert(unsafeShort.numElements == 3)
+shortArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeShort.getShort(i) == e)
+}
+
+val unsafeInt = ExpressionEncoder[Array[Int]].resolveAndBind().
+  toRow(intArray).getArray(0)
+assert(unsafeInt.isInstanceOf[UnsafeArrayData])
+assert(unsafeInt.numElements == 3)
+intArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeInt.getInt(i) == e)
+}
+
+val unsafeLong = ExpressionEncoder[Array[Long]].resolveAndBind().
+  toRow(longArray).getArray(0)
+assert(unsafeLong.isInstanceOf[UnsafeArrayData])
+assert(unsafeLong.numElements == 3)
+longArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeLong.getLong(i) == e)
+}
+
+val unsafeFloat = ExpressionEncoder[Array[Float]].resolveAndBind().
+  toRow(floatArray).getArray(0)
+assert(unsafeFloat.isInstanceOf[UnsafeArrayData])
+assert(unsafeFloat.numElements == 3)
+floatArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeFloat.getFloat(i) == e)
+}
+
+val unsafeDouble = ExpressionEncoder[Array[Double]].resolveAndBind().
+  toRow(doubleArray).getArray(0)
+assert(unsafeDouble.isInstanceOf[UnsafeArrayData])
+assert(unsafeDouble.numElements == 3)
+doubleArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeDouble.getDouble(i) == e)
+}
   }
 
-  test("from primitive double array") {
-val array = Array(1.1, 2.2, 3.3)
-val unsafe = UnsafeArrayData.fromPrimitiveArray(array)
-assert(unsafe.numElements == 3)
-assert(unsafe.getSizeInBytes == 4 + 4 * 3 + 8 * 3)
-assert(unsafe.getDouble(0) == 1.1)
-assert(unsafe.getDouble(1) == 2.2)
-assert(unsafe.getDouble(2) == 3.3)
+  test("from primitive array") {
+val unsafeInt = UnsafeArrayData.fromPrimitiveArray(intArray)
+assert(unsafeInt.numElements == 3)
+assert(unsafeInt.getSizeInBytes ==
+  ((4 + scala.math.ceil(3/64.toDouble) * 8 + 4 * 3 + 7).toInt / 8) * 8)
+intArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeInt.getInt(i) == e)
+}
+
+val unsafeDouble = UnsafeArrayData.fromPrimitiveArray(doubleArray)
+assert(unsafeDouble.numElements == 3)
+assert(unsafeDouble.getSizeInBytes ==
+  ((4 + scala.math.ceil(3/64.toDouble) * 8 + 8 * 3 + 7).toInt / 8) * 8)
+doubleArray.zipWithIndex.map { case (e, i) =>
+  assert(unsafeDouble.getDouble(i) == e)
+}
+  }
+
+  test("to primitive array") {
+val intCount = intArray.length
+val intUnsafeArray = new UnsafeArrayData
+val intHeader = UnsafeArrayData.calculateHeaderPortionInBytes(intCount)
+val intSize = intHeader + 4 * intCount
+val intBuffer = new Array[Byte](intSize)
+Platform.putInt(intBuffer, Platform.BYTE_ARRAY_OFFSET, intCount)
+intUnsafeArray.pointTo(intBuffer, 

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483609
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
+// Use this assignment for SPARK-15962
+// val size = 4 + 4 * count
+val size = UnsafeArrayData.calculateHeaderPortionInBytes(count)
+size
+  }
+
+  def readUnsafeArray(iters: Int): Unit = {
+val count = 1024 * 1024 * 16
+
+var intResult: Int = 0
+val intBuffer = new Array[Int](count)
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483573
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -222,16 +226,17 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
   case _ => s"$arrayWriter.write($index, $element);"
 }
 
+val dataType = if (ctx.isPrimitiveType(jt)) ctx.primitiveTypeName(et) 
else ""
--- End diff --

Sure, done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483519
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,91 +38,144 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
   public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
--- End diff --

updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483560
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -189,29 +189,33 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 
 val jt = ctx.javaType(et)
 
-val fixedElementSize = et match {
+val elementOrOffsetSize = et match {
   case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => 8
   case _ if ctx.isPrimitiveType(jt) => et.defaultSize
-  case _ => 0
+  case _ => 8  // we need 8 bytes to store offset and length for 
variable-length types]
--- End diff --

Sorry, removed this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483534
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -33,91 +38,144 @@
   // The offset of the global buffer where we start to write this array.
   private int startingOffset;
 
+  // The number of elements in this array
+  private int numElements;
+
+  private int headerInBytes;
+
+  private void assertIndexIsValid(int index) {
+assert index >= 0 : "index (" + index + ") should >= 0";
+assert index < numElements : "index (" + index + ") should < " + 
numElements;
+  }
+
   public void initialize(BufferHolder holder, int numElements, int 
fixedElementSize) {
-// We need 4 bytes to store numElements and 4 bytes each element to 
store offset.
-final int fixedSize = 4 + 4 * numElements;
+// We need 4 bytes to store numElements in header
+this.numElements = numElements;
+this.headerInBytes = calculateHeaderPortionInBytes(numElements);
 
 this.holder = holder;
 this.startingOffset = holder.cursor;
 
-holder.grow(fixedSize);
-Platform.putInt(holder.buffer, holder.cursor, numElements);
-holder.cursor += fixedSize;
+// Grows the global buffer ahead for header and fixed size data.
+holder.grow(headerInBytes + fixedElementSize * numElements);
+
+// Initialize information in header
--- End diff --

Thanks, added this comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483506
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +325,115 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
+boolean[] values = new boolean[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 4 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+  @Override
+  public byte[] toByteArray() {
+int size = numElements();
+byte[] values = new byte[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
+  @Override
+  public short[] toShortArray() {
+int size = numElements();
+short[] values = new short[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.SHORT_ARRAY_OFFSET, size * 2);
+return values;
+  }
 
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 4;
-}
+  @Override
+  public int[] toIntArray() {
+int size = numElements();
+int[] values = new int[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.INT_ARRAY_OFFSET, size * 4);
+return values;
+  }
 
-Platform.copyMemory(arr, Platform.INT_ARRAY_OFFSET, data,
-  Platform.BYTE_ARRAY_OFFSET + 4 + offsetRegionSize, valueRegionSize);
+  @Override
+  public long[] toLongArray() {
+int size = numElements();
+long[] values = new long[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.LONG_ARRAY_OFFSET, size * 8);
+return values;
+  }
 
-UnsafeArrayData result = new UnsafeArrayData();
-result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, totalSize);
-return result;
+  @Override
+  public float[] toFloatArray() {
+int size = numElements();
+float[] values = new float[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.FLOAT_ARRAY_OFFSET, size * 4);
+return values;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 12) {
+  @Override
+  public double[] toDoubleArray() {
+int size = numElements();
+double[] values = new double[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.DOUBLE_ARRAY_OFFSET, size * 8);
+return values;
+  }
+
+  private static UnsafeArrayData fromPrimitiveArray(
+   Object arr, int offset, int length, int elementSize) {
+final long headerSize = calculateHeaderPortionInBytes(length);
+final long valueRegionSize = (long)elementSize * (long)length;
+final long allocationSize = (headerSize + valueRegionSize + 7) / 8;
+if (allocationSize > (long)Integer.MAX_VALUE) {
   throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
 "it's too big.");
 }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 8 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+final long[] data = new long[(int)allocationSize];
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
-
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 8;
-}
-
-

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483456
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +325,115 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
+boolean[] values = new boolean[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 4 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+  @Override
+  public byte[] toByteArray() {
+int size = numElements();
+byte[] values = new byte[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
+  @Override
+  public short[] toShortArray() {
+int size = numElements();
+short[] values = new short[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.SHORT_ARRAY_OFFSET, size * 2);
+return values;
+  }
 
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 4;
-}
+  @Override
+  public int[] toIntArray() {
+int size = numElements();
+int[] values = new int[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.INT_ARRAY_OFFSET, size * 4);
+return values;
+  }
 
-Platform.copyMemory(arr, Platform.INT_ARRAY_OFFSET, data,
-  Platform.BYTE_ARRAY_OFFSET + 4 + offsetRegionSize, valueRegionSize);
+  @Override
+  public long[] toLongArray() {
+int size = numElements();
+long[] values = new long[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.LONG_ARRAY_OFFSET, size * 8);
+return values;
+  }
 
-UnsafeArrayData result = new UnsafeArrayData();
-result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, totalSize);
-return result;
+  @Override
+  public float[] toFloatArray() {
+int size = numElements();
+float[] values = new float[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.FLOAT_ARRAY_OFFSET, size * 4);
+return values;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 12) {
+  @Override
+  public double[] toDoubleArray() {
+int size = numElements();
+double[] values = new double[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.DOUBLE_ARRAY_OFFSET, size * 8);
+return values;
+  }
+
+  private static UnsafeArrayData fromPrimitiveArray(
+   Object arr, int offset, int length, int elementSize) {
+final long headerSize = calculateHeaderPortionInBytes(length);
+final long valueRegionSize = (long)elementSize * (long)length;
+final long allocationSize = (headerSize + valueRegionSize + 7) / 8;
+if (allocationSize > (long)Integer.MAX_VALUE) {
   throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
 "it's too big.");
 }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 8 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+final long[] data = new long[(int)allocationSize];
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
-
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 8;
-}
-
-

[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483421
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +325,115 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
+boolean[] values = new boolean[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 4 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+  @Override
+  public byte[] toByteArray() {
+int size = numElements();
+byte[] values = new byte[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
+  @Override
+  public short[] toShortArray() {
+int size = numElements();
+short[] values = new short[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.SHORT_ARRAY_OFFSET, size * 2);
+return values;
+  }
 
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 4;
-}
+  @Override
+  public int[] toIntArray() {
+int size = numElements();
+int[] values = new int[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.INT_ARRAY_OFFSET, size * 4);
+return values;
+  }
 
-Platform.copyMemory(arr, Platform.INT_ARRAY_OFFSET, data,
-  Platform.BYTE_ARRAY_OFFSET + 4 + offsetRegionSize, valueRegionSize);
+  @Override
+  public long[] toLongArray() {
+int size = numElements();
+long[] values = new long[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.LONG_ARRAY_OFFSET, size * 8);
+return values;
+  }
 
-UnsafeArrayData result = new UnsafeArrayData();
-result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, totalSize);
-return result;
+  @Override
+  public float[] toFloatArray() {
+int size = numElements();
+float[] values = new float[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.FLOAT_ARRAY_OFFSET, size * 4);
+return values;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 12) {
+  @Override
+  public double[] toDoubleArray() {
+int size = numElements();
+double[] values = new double[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.DOUBLE_ARRAY_OFFSET, size * 8);
+return values;
+  }
+
+  private static UnsafeArrayData fromPrimitiveArray(
+   Object arr, int offset, int length, int elementSize) {
+final long headerSize = calculateHeaderPortionInBytes(length);
+final long valueRegionSize = (long)elementSize * (long)length;
+final long allocationSize = (headerSize + valueRegionSize + 7) / 8;
--- End diff --

updated these names


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69483401
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -341,63 +325,115 @@ public UnsafeArrayData copy() {
 return arrayCopy;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
-  throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
-}
+  @Override
+  public boolean[] toBooleanArray() {
+int size = numElements();
+boolean[] values = new boolean[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-final int offsetRegionSize = 4 * arr.length;
-final int valueRegionSize = 4 * arr.length;
-final int totalSize = 4 + offsetRegionSize + valueRegionSize;
-final byte[] data = new byte[totalSize];
+  @Override
+  public byte[] toByteArray() {
+int size = numElements();
+byte[] values = new byte[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.BYTE_ARRAY_OFFSET, size);
+return values;
+  }
 
-Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, arr.length);
+  @Override
+  public short[] toShortArray() {
+int size = numElements();
+short[] values = new short[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.SHORT_ARRAY_OFFSET, size * 2);
+return values;
+  }
 
-int offsetPosition = Platform.BYTE_ARRAY_OFFSET + 4;
-int valueOffset = 4 + offsetRegionSize;
-for (int i = 0; i < arr.length; i++) {
-  Platform.putInt(data, offsetPosition, valueOffset);
-  offsetPosition += 4;
-  valueOffset += 4;
-}
+  @Override
+  public int[] toIntArray() {
+int size = numElements();
+int[] values = new int[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.INT_ARRAY_OFFSET, size * 4);
+return values;
+  }
 
-Platform.copyMemory(arr, Platform.INT_ARRAY_OFFSET, data,
-  Platform.BYTE_ARRAY_OFFSET + 4 + offsetRegionSize, valueRegionSize);
+  @Override
+  public long[] toLongArray() {
+int size = numElements();
+long[] values = new long[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.LONG_ARRAY_OFFSET, size * 8);
+return values;
+  }
 
-UnsafeArrayData result = new UnsafeArrayData();
-result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, totalSize);
-return result;
+  @Override
+  public float[] toFloatArray() {
+int size = numElements();
+float[] values = new float[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.FLOAT_ARRAY_OFFSET, size * 4);
+return values;
   }
 
-  public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 12) {
+  @Override
+  public double[] toDoubleArray() {
+int size = numElements();
+double[] values = new double[size];
+Platform.copyMemory(
+  baseObject, baseOffset + headerInBytes, values, 
Platform.DOUBLE_ARRAY_OFFSET, size * 8);
+return values;
+  }
+
+  private static UnsafeArrayData fromPrimitiveArray(
+   Object arr, int offset, int length, int elementSize) {
+final long headerSize = calculateHeaderPortionInBytes(length);
+final long valueRegionSize = (long)elementSize * (long)length;
+final long allocationSize = (headerSize + valueRegionSize + 7) / 8;
+if (allocationSize > (long)Integer.MAX_VALUE) {
--- End diff --

removed cast


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69469005
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
--- End diff --

Ah, got it. you say we can remove it when we use encoder.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13680: [SPARK-15962][SQL] Introduce implementation with ...

2016-07-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/13680#discussion_r69426474
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ *  build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+  def calculateHeaderPortionInBytes(count: Int) : Int = {
--- End diff --

I would like to keep this for ease of supporting multiple versions that 
have different headers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >