[
https://issues.apache.org/jira/browse/SPARK-38823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17521414#comment-17521414
]
Bruce Robbins commented on SPARK-38823:
---------------------------------------
This appears to be an optimization bug that results in corruption of the
buffers in {{AggregationIterator}}.
On master and 3.3, {{NewInstance}} with no arguments is considered foldable. As
a result, the {{ConstantFolding}} rule turns NewInstance into a Literal holding
an instance of the user's specified Java bean. The instance becomes a singleton
that gets reused for each input record (although its fields get updated by
{{InitializeJavaBean}}).
Because the instance gets reused, sometimes multiple buffers in
{{AggregationIterator}} are actually referring to the same Java bean instance.
Take, for example, the test I added
[here|https://github.com/bersprockets/spark/blob/17a8ad64f5bc39cb26d25b63f3692e7b8632baf8/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java#L560].
The input is:
{noformat}
List<Item> items = Arrays.asList(
new Item("a", 1),
new Item("b", 3),
new Item("c", 2),
new Item("a", 7));
{noformat}
As {{ObjectAggregationIterator}} reads the input, the buffers get set up as
follows (note that the first field of Item should be the same as the key):
{noformat}
- Read Item("a", 1)
- Buffers are now:
Key "a" --> Item("a", 1)
- Read Item("b", 3)
- Buffers are now:
Key "a" -> Item("b", 3)
Key "b" -> Item("b", 3)
{noformat}
The buffer for key "a" now contains Item("b", 3). That's because both buffers
contain a reference to the same Item instance, and that Item instance's fields
were updated when {{Item("b", 3)}} was read.
When {{AggregateIterator}} finally calls the test's reduce function, it will
pass the same Item instance ({{Item("a", 7)}}) as both the buffer and the input
record. At that point, the buffers for "a", "b", and "c" will all contain
{{Item("a", 7)}}.
I _think_ the fix for this is to make {{NewInstance}} non-foldable. My linked
test passes with that change (and fails without it). I will run the unit tests
and hopefully make a PR tomorrow, assuming the proposed fix doesn't break
something else besides {{ConstantFoldingSuite}}.
> Incorrect result of dataset reduceGroups in java
> ------------------------------------------------
>
> Key: SPARK-38823
> URL: https://issues.apache.org/jira/browse/SPARK-38823
> Project: Spark
> Issue Type: Bug
> Components: Java API
> Affects Versions: 3.4.0
> Reporter: IKozar
> Priority: Major
>
> {code:java}
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> public static class Item implements Serializable {
> private String x;
> private String y;
> private int z;
> public Item addZ(int z) {
> return new Item(x, y, this.z + z);
> }
> } {code}
> {code:java}
> List<Item> items = List.of(
> new Item("X1", "Y1", 1),
> new Item("X2", "Y1", 1),
> new Item("X1", "Y1", 1),
> new Item("X2", "Y1", 1),
> new Item("X3", "Y1", 1),
> new Item("X1", "Y1", 1),
> new Item("X1", "Y2", 1),
> new Item("X2", "Y1", 1));
> Dataset<Item> ds = spark.createDataFrame(items,
> Item.class).as(Encoders.bean(Item.class));
> ds.groupByKey((MapFunction<Item, Tuple2<String, String>>) item ->
> Tuple2.apply(item.getX(), item.getY()),
> Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
> .reduceGroups((ReduceFunction<Item>) (item1, item2) ->
> item1.addZ(item2.getZ()))
> .show(10);
> {code}
> result is
> {noformat}
> +--------+----------------------------------------------+
> | key|ReduceAggregator(poc.job.JavaSparkReduce$Item)|
> +--------+----------------------------------------------+
> |{X1, Y1}| {X2, Y1, 2}|-- expected 3
> |{X2, Y1}| {X2, Y1, 2}|-- expected 3
> |{X1, Y2}| {X2, Y1, 1}|
> |{X3, Y1}| {X2, Y1, 1}|
> +--------+----------------------------------------------+{noformat}
> pay attention that key doesn't mach with value
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]