[
https://issues.apache.org/jira/browse/SPARK-20254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-20254:
------------------------------------
Assignee: (was: Apache Spark)
> SPARK-19716 generates unnecessary data conversion for Dataset with primitive
> array
> ----------------------------------------------------------------------------------
>
> Key: SPARK-20254
> URL: https://issues.apache.org/jira/browse/SPARK-20254
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.2.0
> Reporter: Kazuaki Ishizaki
>
> Since {{unresolvedmapobjects}} is newly introduced by SPARK-19716, the
> current implementation generates {{mapobjects()}} at {{DeserializeToObject}}
> in {{Analyzed Logical Plan}}. This {{mapObject()}} introduces Java code to
> store an array into {{GenericArrayData}}.
> cc: [~cloud_fan]
>
> {code}
> val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
> ds.count
> val ds2 = ds.map(e => e)
> ds2.explain(true)
> ds2.show
> {code}
> Plans before SPARK-19716
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#25]
> +- 'MapElements <function1>, class [D,
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
> +- 'DeserializeToObject
> unresolveddeserializer(upcast(getcolumnbyordinal(0,
> ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class:
> "scala.Array").toDoubleArray), obj#23: [D
> +- SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#2]
> +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array<double>
> SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#25]
> +- MapElements <function1>, class [D,
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
> +- DeserializeToObject cast(value#2 as array<double>).toDoubleArray,
> obj#23: [D
> +- SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#2]
> +- ExternalRDD [obj#1]
> == Optimized Logical Plan ==
> SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#25]
> +- MapElements <function1>, class [D,
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
> +- DeserializeToObject value#2.toDoubleArray, obj#23: [D
> +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory,
> deserialized, 1 replicas)
> +- *SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#2]
> +- Scan ExternalRDDScan[obj#1]
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#25]
> +- *MapElements <function1>, obj#24: [D
> +- *DeserializeToObject value#2.toDoubleArray, obj#23: [D
> +- *InMemoryTableScan [value#2]
> +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
> +- *SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#2]
> +- Scan ExternalRDDScan[obj#1]
> {code}
> Plans after SPARK-19716
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#25]
> +- 'MapElements <function1>, class [D,
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
> +- 'DeserializeToObject
> unresolveddeserializer(unresolvedmapobjects(<function1>,
> getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray),
> obj#23: [D
> +- SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#2]
> +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array<double>
> SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#25]
> +- MapElements <function1>, class [D,
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
> +- DeserializeToObject mapobjects(MapObjects_loopValue4,
> MapObjects_loopIsNull4, DoubleType,
> assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4,
> DoubleType, true), - array element class: "scala.Double", - root class:
> "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray,
> obj#23: [D
> +- SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#2]
> +- ExternalRDD [obj#1]
> == Optimized Logical Plan ==
> SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#25]
> +- MapElements <function1>, class [D,
> [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
> +- DeserializeToObject mapobjects(MapObjects_loopValue4,
> MapObjects_loopIsNull4, DoubleType,
> assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4,
> DoubleType, true), - array element class: "scala.Double", - root class:
> "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray,
> obj#23: [D
> +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory,
> deserialized, 1 replicas)
> +- *SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#2]
> +- Scan ExternalRDDScan[obj#1]
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#25]
> +- *MapElements <function1>, obj#24: [D
> +- *DeserializeToObject mapobjects(MapObjects_loopValue4,
> MapObjects_loopIsNull4, DoubleType,
> assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4,
> DoubleType, true), - array element class: "scala.Double", - root class:
> "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray,
> obj#23: [D
> +- InMemoryTableScan [value#2]
> +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
> +- *SerializeFromObject [staticinvoke(class
> org.apache.spark.sql.catalyst.expressions.UnsafeArrayData,
> ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS
> value#2]
> +- Scan ExternalRDDScan[obj#1]
> {{java}}
> {{java}}
> ...
> /* 056 */ ArrayData deserializetoobject_value1 = null;
> /* 057 */
> /* 058 */ if (!inputadapter_isNull) {
> /* 059 */ int deserializetoobject_dataLength =
> inputadapter_value.numElements();
> /* 060 */
> /* 061 */ Double[] deserializetoobject_convertedArray = null;
> /* 062 */ deserializetoobject_convertedArray = new
> Double[deserializetoobject_dataLength];
> /* 063 */
> /* 064 */ int deserializetoobject_loopIndex = 0;
> /* 065 */ while (deserializetoobject_loopIndex <
> deserializetoobject_dataLength) {
> /* 066 */ MapObjects_loopValue2 = (double)
> (inputadapter_value.getDouble(deserializetoobject_loopIndex));
> /* 067 */ MapObjects_loopIsNull2 =
> inputadapter_value.isNullAt(deserializetoobject_loopIndex);
> /* 068 */
> /* 069 */ if (MapObjects_loopIsNull2) {
> /* 070 */ throw new RuntimeException(((java.lang.String)
> references[0]));
> /* 071 */ }
> /* 072 */ if (false) {
> /* 073 */
> deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
> /* 074 */ } else {
> /* 075 */
> deserializetoobject_convertedArray[deserializetoobject_loopIndex] =
> MapObjects_loopValue2;
> /* 076 */ }
> /* 077 */
> /* 078 */ deserializetoobject_loopIndex += 1;
> /* 079 */ }
> /* 080 */
> /* 081 */ deserializetoobject_value1 = new
> org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray);
> /*###*/
> /* 082 */ }
> ...
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]