Hello folks, I would like to ask Spark devs if and it possible to define explicitly the key/value types for a map (Spark 3.3.0) as shown below:
import org.apache.spark.sql.functions.{expr, collect_list} > val df = Seq( > (1, Map("k1" -> "v1", "k2" -> "v3")), > (1, Map("k3" -> "v3")), > (2, Map("k4" -> "v4")), > (2, Map("k6" -> "v6", "k5" -> "v5")) > ).toDF("id", "data") > val mergeExpr = expr("aggregate(data, map(), (acc, i) -> map_concat(acc, i))") > > df.groupBy("id").agg(collect_list("data").as("data")) > .select($"id", mergeExpr.as("merged_data")) > .show(false) > > The above code throws the next error: AnalysisException: cannot resolve 'aggregate(`data`, map(), > lambdafunction(map_concat(namedlambdavariable(), namedlambdavariable()), > namedlambdavariable(), namedlambdavariable()), > lambdafunction(namedlambdavariable(), namedlambdavariable()))' due to data > type mismatch: argument 3 requires map<null,null> type, however, > 'lambdafunction(map_concat(namedlambdavariable(), namedlambdavariable()), > namedlambdavariable(), namedlambdavariable())' is of map<string,string> > type.; Project [id#110, aggregate(data#119, map(), > lambdafunction(map_concat(cast(lambda acc#122 as map<string,string>), > lambda i#123), lambda acc#122, lambda i#123, false), lambdafunction(lambda > id#124, lambda id#124, false)) AS aggregate(data, map(), > lambdafunction(map_concat(namedlambdavariable(), namedlambdavariable()), > namedlambdavariable(), namedlambdavariable()), > lambdafunction(namedlambdavariable(), namedlambdavariable()))#125] +- > Aggregate [id#110], [id#110, collect_list(data#111, 0, 0) AS data#119] +- > Project [_1#105 AS id#110, _2#106 AS data#111] +- LocalRelation [_1#105, > _2#106] It seems that map() is initialised as map<null,null> when map<string,string> is expected. I believe that the behaviour has changed since 2.4.5 where map was initialised as map<string, string>, and the previous example was working. Is it possible to create a map by specifying the key-value type explicitly? So far, I came up with a workaround using map('', '') to initialise the map for string key-value and using map_filter() to exclude/remove the redundant map('', '') key-value item: > val mergeExpr = expr("map_filter(aggregate(data, map('', ''), (acc, i) -> > map_concat(acc, i)), (k, v) -> k != '')") Thank you for your help Greetings, Alex