Repository: spark
Updated Branches:
  refs/heads/master 92fd7f321 -> ed075e1ff


[SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10.0

## What changes were proposed in this pull request?

Upgrade Apache Arrow to 0.10.0

Version 0.10.0 has a number of bug fixes and improvements with the following 
pertaining directly to usage in Spark:
 * Allow for adding BinaryType support ARROW-2141
 * Bug fix related to array serialization ARROW-1973
 * Python2 str will be made into an Arrow string instead of bytes ARROW-2101
 * Python bytearrays are supported in as input to pyarrow ARROW-2141
 * Java has common interface for reset to cleanup complex vectors in Spark 
ArrowWriter ARROW-1962
 * Cleanup pyarrow type equality checks ARROW-2423
 * ArrowStreamWriter should not hold references to ArrowBlocks ARROW-2632, 
ARROW-2645
 * Improved low level handling of messages for RecordBatch ARROW-2704

## How was this patch tested?

existing tests

Author: Bryan Cutler <cutl...@gmail.com>

Closes #21939 from BryanCutler/arrow-upgrade-010.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed075e1f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed075e1f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed075e1f

Branch: refs/heads/master
Commit: ed075e1ff60cbb3e7b80b9d2f2ff37054412b934
Parents: 92fd7f3
Author: Bryan Cutler <cutl...@gmail.com>
Authored: Tue Aug 14 17:13:38 2018 -0700
Committer: Bryan Cutler <cutl...@gmail.com>
Committed: Tue Aug 14 17:13:38 2018 -0700

----------------------------------------------------------------------
 dev/deps/spark-deps-hadoop-2.6                  |  6 +++---
 dev/deps/spark-deps-hadoop-2.7                  |  6 +++---
 dev/deps/spark-deps-hadoop-3.1                  |  6 +++---
 pom.xml                                         |  2 +-
 python/pyspark/serializers.py                   |  2 ++
 .../spark/sql/vectorized/ArrowColumnVector.java | 12 ++++++------
 .../spark/sql/execution/arrow/ArrowWriter.scala | 20 +++-----------------
 .../vectorized/ArrowColumnVectorSuite.scala     |  4 ++--
 8 files changed, 23 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 3c0952f..bdab79c 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
 api-asn1-api-1.0.0-M20.jar
 api-util-1.0.0-M20.jar
 arpack_combined_all-0.1.jar
-arrow-format-0.8.0.jar
-arrow-memory-0.8.0.jar
-arrow-vector-0.8.0.jar
+arrow-format-0.10.0.jar
+arrow-memory-0.10.0.jar
+arrow-vector-0.10.0.jar
 automaton-1.11-8.jar
 avro-1.8.2.jar
 avro-ipc-1.8.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 310f1e4..ddaf9bb 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
 api-asn1-api-1.0.0-M20.jar
 api-util-1.0.0-M20.jar
 arpack_combined_all-0.1.jar
-arrow-format-0.8.0.jar
-arrow-memory-0.8.0.jar
-arrow-vector-0.8.0.jar
+arrow-format-0.10.0.jar
+arrow-memory-0.10.0.jar
+arrow-vector-0.10.0.jar
 automaton-1.11-8.jar
 avro-1.8.2.jar
 avro-ipc-1.8.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/dev/deps/spark-deps-hadoop-3.1
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
index 9bff2a1..d25d7aa 100644
--- a/dev/deps/spark-deps-hadoop-3.1
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -12,9 +12,9 @@ aopalliance-1.0.jar
 aopalliance-repackaged-2.4.0-b34.jar
 apache-log4j-extras-1.2.17.jar
 arpack_combined_all-0.1.jar
-arrow-format-0.8.0.jar
-arrow-memory-0.8.0.jar
-arrow-vector-0.8.0.jar
+arrow-format-0.10.0.jar
+arrow-memory-0.10.0.jar
+arrow-vector-0.10.0.jar
 automaton-1.11-8.jar
 avro-1.8.2.jar
 avro-ipc-1.8.2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 45fca28..979d709 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,7 +190,7 @@
     If you are changing Arrow version specification, please check 
./python/pyspark/sql/utils.py,
     ./python/run-tests.py and ./python/setup.py too.
     -->
-    <arrow.version>0.8.0</arrow.version>
+    <arrow.version>0.10.0</arrow.version>
 
     <test.java.home>${java.home}</test.java.home>
     <test.exclude.tags></test.exclude.tags>

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 82abf19..47c4c3e 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -229,12 +229,14 @@ def _create_batch(series, timezone):
     def create_array(s, t):
         mask = s.isnull()
         # Ensure timestamp series are in expected form for Spark internal 
representation
+        # TODO: maybe don't need None check anymore as of Arrow 0.9.1
         if t is not None and pa.types.is_timestamp(t):
             s = _check_series_convert_timestamps_internal(s.fillna(0), 
timezone)
             # TODO: need cast after Arrow conversion, ns values cause error 
with pandas 0.19.2
             return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
         elif t is not None and pa.types.is_string(t) and sys.version < '3':
             # TODO: need decode before converting to Arrow in Python 2
+            # TODO: don't need as of Arrow 0.9.1
             return pa.Array.from_pandas(s.apply(
                 lambda v: v.decode("utf-8") if isinstance(v, str) else v), 
mask=mask, type=t)
         elif t is not None and pa.types.is_decimal(t) and \

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java 
b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
index 5aed87f..1c9beda 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
@@ -162,13 +162,13 @@ public final class ArrowColumnVector extends ColumnVector 
{
     } else if (vector instanceof ListVector) {
       ListVector listVector = (ListVector) vector;
       accessor = new ArrayAccessor(listVector);
-    } else if (vector instanceof NullableMapVector) {
-      NullableMapVector mapVector = (NullableMapVector) vector;
-      accessor = new StructAccessor(mapVector);
+    } else if (vector instanceof StructVector) {
+      StructVector structVector = (StructVector) vector;
+      accessor = new StructAccessor(structVector);
 
-      childColumns = new ArrowColumnVector[mapVector.size()];
+      childColumns = new ArrowColumnVector[structVector.size()];
       for (int i = 0; i < childColumns.length; ++i) {
-        childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i));
+        childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i));
       }
     } else {
       throw new UnsupportedOperationException();
@@ -472,7 +472,7 @@ public final class ArrowColumnVector extends ColumnVector {
    */
   private static class StructAccessor extends ArrowVectorAccessor {
 
-    StructAccessor(NullableMapVector vector) {
+    StructAccessor(StructVector vector) {
       super(vector);
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
index 3de6ea8..8dd484a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
 
 import org.apache.arrow.vector._
 import org.apache.arrow.vector.complex._
-import org.apache.arrow.vector.types.pojo.ArrowType
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
@@ -62,7 +61,7 @@ object ArrowWriter {
       case (ArrayType(_, _), vector: ListVector) =>
         val elementVector = createFieldWriter(vector.getDataVector())
         new ArrayWriter(vector, elementVector)
-      case (StructType(_), vector: NullableMapVector) =>
+      case (StructType(_), vector: StructVector) =>
         val children = (0 until vector.size()).map { ordinal =>
           createFieldWriter(vector.getChildByOrdinal(ordinal))
         }
@@ -129,20 +128,7 @@ private[arrow] abstract class ArrowFieldWriter {
   }
 
   def reset(): Unit = {
-    // TODO: reset() should be in a common interface
-    valueVector match {
-      case fixedWidthVector: BaseFixedWidthVector => fixedWidthVector.reset()
-      case variableWidthVector: BaseVariableWidthVector => 
variableWidthVector.reset()
-      case listVector: ListVector =>
-        // Manual "reset" the underlying buffer.
-        // TODO: When we upgrade to Arrow 0.10.0, we can simply remove this 
and call
-        // `listVector.reset()`.
-        val buffers = listVector.getBuffers(false)
-        buffers.foreach(buf => buf.setZero(0, buf.capacity()))
-        listVector.setValueCount(0)
-        listVector.setLastSet(0)
-      case _ =>
-    }
+    valueVector.reset()
     count = 0
   }
 }
@@ -323,7 +309,7 @@ private[arrow] class ArrayWriter(
 }
 
 private[arrow] class StructWriter(
-    val valueVector: NullableMapVector,
+    val valueVector: StructVector,
     children: Array[ArrowFieldWriter]) extends ArrowFieldWriter {
 
   override def setNull(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
index b55489c..4592a16 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala
@@ -336,7 +336,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
     val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, 
Long.MaxValue)
     val schema = new StructType().add("int", IntegerType).add("long", LongType)
     val vector = ArrowUtils.toArrowField("struct", schema, nullable = false, 
null)
-      .createVector(allocator).asInstanceOf[NullableMapVector]
+      .createVector(allocator).asInstanceOf[StructVector]
 
     vector.allocateNew()
     val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector]
@@ -373,7 +373,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
     val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, 
Long.MaxValue)
     val schema = new StructType().add("int", IntegerType).add("long", LongType)
     val vector = ArrowUtils.toArrowField("struct", schema, nullable = true, 
null)
-      .createVector(allocator).asInstanceOf[NullableMapVector]
+      .createVector(allocator).asInstanceOf[StructVector]
     vector.allocateNew()
     val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector]
     val longVector = vector.getChildByOrdinal(1).asInstanceOf[BigIntVector]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to