Repository: spark Updated Branches: refs/heads/master 92fd7f321 -> ed075e1ff
[SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10.0 ## What changes were proposed in this pull request? Upgrade Apache Arrow to 0.10.0 Version 0.10.0 has a number of bug fixes and improvements with the following pertaining directly to usage in Spark: * Allow for adding BinaryType support ARROW-2141 * Bug fix related to array serialization ARROW-1973 * Python2 str will be made into an Arrow string instead of bytes ARROW-2101 * Python bytearrays are supported in as input to pyarrow ARROW-2141 * Java has common interface for reset to cleanup complex vectors in Spark ArrowWriter ARROW-1962 * Cleanup pyarrow type equality checks ARROW-2423 * ArrowStreamWriter should not hold references to ArrowBlocks ARROW-2632, ARROW-2645 * Improved low level handling of messages for RecordBatch ARROW-2704 ## How was this patch tested? existing tests Author: Bryan Cutler <cutl...@gmail.com> Closes #21939 from BryanCutler/arrow-upgrade-010. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed075e1f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed075e1f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed075e1f Branch: refs/heads/master Commit: ed075e1ff60cbb3e7b80b9d2f2ff37054412b934 Parents: 92fd7f3 Author: Bryan Cutler <cutl...@gmail.com> Authored: Tue Aug 14 17:13:38 2018 -0700 Committer: Bryan Cutler <cutl...@gmail.com> Committed: Tue Aug 14 17:13:38 2018 -0700 ---------------------------------------------------------------------- dev/deps/spark-deps-hadoop-2.6 | 6 +++--- dev/deps/spark-deps-hadoop-2.7 | 6 +++--- dev/deps/spark-deps-hadoop-3.1 | 6 +++--- pom.xml | 2 +- python/pyspark/serializers.py | 2 ++ .../spark/sql/vectorized/ArrowColumnVector.java | 12 ++++++------ .../spark/sql/execution/arrow/ArrowWriter.scala | 20 +++----------------- .../vectorized/ArrowColumnVectorSuite.scala | 4 ++-- 8 files changed, 23 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/dev/deps/spark-deps-hadoop-2.6 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 3c0952f..bdab79c 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.8.0.jar -arrow-memory-0.8.0.jar -arrow-vector-0.8.0.jar +arrow-format-0.10.0.jar +arrow-memory-0.10.0.jar +arrow-vector-0.10.0.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/dev/deps/spark-deps-hadoop-2.7 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 310f1e4..ddaf9bb 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.8.0.jar -arrow-memory-0.8.0.jar -arrow-vector-0.8.0.jar +arrow-format-0.10.0.jar +arrow-memory-0.10.0.jar +arrow-vector-0.10.0.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/dev/deps/spark-deps-hadoop-3.1 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1 index 9bff2a1..d25d7aa 100644 --- a/dev/deps/spark-deps-hadoop-3.1 +++ b/dev/deps/spark-deps-hadoop-3.1 @@ -12,9 +12,9 @@ aopalliance-1.0.jar aopalliance-repackaged-2.4.0-b34.jar apache-log4j-extras-1.2.17.jar arpack_combined_all-0.1.jar -arrow-format-0.8.0.jar -arrow-memory-0.8.0.jar -arrow-vector-0.8.0.jar +arrow-format-0.10.0.jar +arrow-memory-0.10.0.jar +arrow-vector-0.10.0.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 45fca28..979d709 100644 --- a/pom.xml +++ b/pom.xml @@ -190,7 +190,7 @@ If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py, ./python/run-tests.py and ./python/setup.py too. --> - <arrow.version>0.8.0</arrow.version> + <arrow.version>0.10.0</arrow.version> <test.java.home>${java.home}</test.java.home> <test.exclude.tags></test.exclude.tags> http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/python/pyspark/serializers.py ---------------------------------------------------------------------- diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 82abf19..47c4c3e 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -229,12 +229,14 @@ def _create_batch(series, timezone): def create_array(s, t): mask = s.isnull() # Ensure timestamp series are in expected form for Spark internal representation + # TODO: maybe don't need None check anymore as of Arrow 0.9.1 if t is not None and pa.types.is_timestamp(t): s = _check_series_convert_timestamps_internal(s.fillna(0), timezone) # TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2 return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False) elif t is not None and pa.types.is_string(t) and sys.version < '3': # TODO: need decode before converting to Arrow in Python 2 + # TODO: don't need as of Arrow 0.9.1 return pa.Array.from_pandas(s.apply( lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t) elif t is not None and pa.types.is_decimal(t) and \ http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index 5aed87f..1c9beda 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -162,13 +162,13 @@ public final class ArrowColumnVector extends ColumnVector { } else if (vector instanceof ListVector) { ListVector listVector = (ListVector) vector; accessor = new ArrayAccessor(listVector); - } else if (vector instanceof NullableMapVector) { - NullableMapVector mapVector = (NullableMapVector) vector; - accessor = new StructAccessor(mapVector); + } else if (vector instanceof StructVector) { + StructVector structVector = (StructVector) vector; + accessor = new StructAccessor(structVector); - childColumns = new ArrowColumnVector[mapVector.size()]; + childColumns = new ArrowColumnVector[structVector.size()]; for (int i = 0; i < childColumns.length; ++i) { - childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i)); + childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i)); } } else { throw new UnsupportedOperationException(); @@ -472,7 +472,7 @@ public final class ArrowColumnVector extends ColumnVector { */ private static class StructAccessor extends ArrowVectorAccessor { - StructAccessor(NullableMapVector vector) { + StructAccessor(StructVector vector) { super(vector); } } http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index 3de6ea8..8dd484a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._ import org.apache.arrow.vector._ import org.apache.arrow.vector.complex._ -import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters @@ -62,7 +61,7 @@ object ArrowWriter { case (ArrayType(_, _), vector: ListVector) => val elementVector = createFieldWriter(vector.getDataVector()) new ArrayWriter(vector, elementVector) - case (StructType(_), vector: NullableMapVector) => + case (StructType(_), vector: StructVector) => val children = (0 until vector.size()).map { ordinal => createFieldWriter(vector.getChildByOrdinal(ordinal)) } @@ -129,20 +128,7 @@ private[arrow] abstract class ArrowFieldWriter { } def reset(): Unit = { - // TODO: reset() should be in a common interface - valueVector match { - case fixedWidthVector: BaseFixedWidthVector => fixedWidthVector.reset() - case variableWidthVector: BaseVariableWidthVector => variableWidthVector.reset() - case listVector: ListVector => - // Manual "reset" the underlying buffer. - // TODO: When we upgrade to Arrow 0.10.0, we can simply remove this and call - // `listVector.reset()`. - val buffers = listVector.getBuffers(false) - buffers.foreach(buf => buf.setZero(0, buf.capacity())) - listVector.setValueCount(0) - listVector.setLastSet(0) - case _ => - } + valueVector.reset() count = 0 } } @@ -323,7 +309,7 @@ private[arrow] class ArrayWriter( } private[arrow] class StructWriter( - val valueVector: NullableMapVector, + val valueVector: StructVector, children: Array[ArrowFieldWriter]) extends ArrowFieldWriter { override def setNull(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/ed075e1f/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala index b55489c..4592a16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala @@ -336,7 +336,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue) val schema = new StructType().add("int", IntegerType).add("long", LongType) val vector = ArrowUtils.toArrowField("struct", schema, nullable = false, null) - .createVector(allocator).asInstanceOf[NullableMapVector] + .createVector(allocator).asInstanceOf[StructVector] vector.allocateNew() val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector] @@ -373,7 +373,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue) val schema = new StructType().add("int", IntegerType).add("long", LongType) val vector = ArrowUtils.toArrowField("struct", schema, nullable = true, null) - .createVector(allocator).asInstanceOf[NullableMapVector] + .createVector(allocator).asInstanceOf[StructVector] vector.allocateNew() val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector] val longVector = vector.getChildByOrdinal(1).asInstanceOf[BigIntVector] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org