[
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Svyatoslav Semenyuk updated SPARK-43514:
----------------------------------------
Description:
We designed a function that joins two DFs on common column with some
similarity. All next code will be on Scala 2.12.
I've added {{show}} calls for demonstration purposes.
{code:scala}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram,
RegexTokenizer, MinHashLSHModel}
import org.apache.spark.sql.{DataFrame, Column}
/**
* Joins two data frames on a string column using LSH algorithm
* for similarity computation.
*
* If input data frames have columns with identical names,
* the resulting dataframe will have columns from them both
* with prefixes `datasetA` and `datasetB` respectively.
*
* For example, if both dataframes have a column with name `myColumn`,
* then the result will have columns `datasetAMyColumn` and `datasetBMyColumn`.
*/
def similarityJoin(
df: DataFrame,
anotherDf: DataFrame,
joinExpr: String,
threshold: Double = 0.8,
): DataFrame = {
df.show(false)
anotherDf.show(false)
val pipeline = new Pipeline().setStages(Array(
new RegexTokenizer()
.setPattern("")
.setMinTokenLength(1)
.setInputCol(joinExpr)
.setOutputCol("tokens"),
new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
)
)
val model = pipeline.fit(df)
val storedHashed = model.transform(df)
val landedHashed = model.transform(anotherDf)
val commonColumns = df.columns.toSet & anotherDf.columns.toSet
/**
* Converts column name from a data frame to the column of resulting
dataset.
*/
def convertColumn(datasetName: String)(columnName: String): Column = {
val newName =
if (commonColumns.contains(columnName))
s"$datasetName${columnName.capitalize}"
else columnName
col(s"$datasetName.$columnName") as newName
}
val columnsToSelect = df.columns.map(convertColumn("datasetA")) ++
anotherDf.columns.map(convertColumn("datasetB"))
val result = model
.stages
.last
.asInstanceOf[MinHashLSHModel]
.approxSimilarityJoin(storedHashed, landedHashed, threshold,
"confidence")
.select(columnsToSelect.toSeq: _*)
result.show(false)
result
}
{code}
Now consider such simple example:
{code:scala}
val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
similarityJoin(inputDF1, inputDF2, "name", 0.6)
{code}
This example runs with no errors and outputs 3 empty DFs. Let's add
{{distinct}} method to one data frame:
{code:scala}
val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") >
2) as "df1"
val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
similarityJoin(inputDF1, inputDF2, "name", 0.6)
{code}
This example outputs two empty DFs and then fails at {{result.show(false)}}.
Error:
{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user
defined function (LSHModel$$Lambda$3769/0x0000000101804840:
(struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) =>
array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
... many elided
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at
least 1 non zero entry.
at scala.Predef$.require(Predef.scala:281)
at
org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
... many more
{code}
----
Now let's take a look on the example which is close to our application code.
Define some helper functions:
{code:scala}
import org.apache.spark.sql.functions.{transform, to_timestamp}
def process1(df: DataFrame): Unit = {
val companies = df.select($"id", $"name")
val directors = df
.select(explode($"directors"))
.select($"col.name", $"col.id")
.dropDuplicates("id")
val toBeMatched1 = companies
.filter(length($"name") > 2)
.select(
$"name",
$"id" as "sourceLegalEntityId",
)
val toBeMatched2 = directors
.filter(length($"name") > 2)
.select(
$"name",
$"id" as "directorId",
)
similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
}
def process2(df: DataFrame): Unit = {
def process_financials(column: Column): Column = {
transform(
column,
x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")),
)
}
val companies = df.select(
$"id",
$"name",
struct(
process_financials($"financials.balanceSheet") as "balanceSheet",
process_financials($"financials.capitalAndReserves") as
"capitalAndReserves",
) as "financials",
)
val directors = df
.select(explode($"directors"))
.select($"col.name", $"col.id")
.dropDuplicates("id")
val toBeMatched1 = companies
.filter(length($"name") > 2)
.select(
$"name",
$"id" as "sourceLegalEntityId",
)
val toBeMatched2 = directors
.filter(length($"name") > 2)
.select(
$"name",
$"id" as "directorId",
)
similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
}
{code}
Function {{process2}} does the same job as {{process1}}, but also does some
transforms on {{financials}} column before executing similarity join.
Example data frame and its schema:
{code:scala}
import org.apache.spark.sql.types._
val schema = StructType(
Seq(
StructField("id", StringType),
StructField("name", StringType),
StructField(
"directors",
ArrayType(
StructType(Seq(StructField("id", StringType),
StructField("name", StringType))),
containsNull = true,
),
),
StructField(
"financials",
StructType(
Seq(
StructField(
"balanceSheet",
ArrayType(
StructType(Seq(
StructField("date", StringType),
StructField("value", StringType),
)
),
containsNull = true,
),
),
StructField(
"capitalAndReserves",
ArrayType(
StructType(Seq(
StructField("date", StringType),
StructField("value", StringType),
)
),
containsNull = true,
),
),
),
),
),
)
)
val mainDF = (1 to 10)
.toDF("data")
.withColumn("data", lit(null) cast schema)
.select("data.*")
{code}
This code just makes a data frame with 10 rows of null column casted to the
specified schema.
Now let's pass {[mainDF}} to previously defined functions and observe results.
Example 1:
{code:scala}
process1(mainDF)
{code}
Outputs three empty DFs, no errors.
Example 2:
{code:scala}
process1(mainDF.distinct())
{code}
Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user
defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) =>
array<string>).
... many elided
Caused by: java.lang.NullPointerException
at
org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
... many more
{code}
Example 3:
{code:scala}
process2(mainDF)
{code}
Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user
defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) =>
array<string>).
... many elided
Caused by: java.lang.NullPointerException
at
org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
... many more
{code}
Somehow presence of {{distinct}} DF method or {{transform}} (or
{{to_timestamp}}) SQL function before executing similarity join causes it to
fail on empty input data frames. If these operations are done after join, then
no errors are emitted.
---
Current workaround: call {{distinct}} DF method and {{transform}} SQL function
after similarity join.
was:
We designed a function that joins two DFs on common column with some
similarity. All next code will be on Scala 2.12.
I've added {{show}} calls for demonstration purposes.
{code:scala}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram,
RegexTokenizer, MinHashLSHModel}
import org.apache.spark.sql.{DataFrame, Column}
/**
* Joins two data frames on a string column using LSH algorithm
* for similarity computation.
*
* If input data frames have columns with identical names,
* the resulting dataframe will have columns from them both
* with prefixes `datasetA` and `datasetB` respectively.
*
* For example, if both dataframes have a column with name `myColumn`,
* then the result will have columns `datasetAMyColumn` and `datasetBMyColumn`.
*/
def similarityJoin(
df: DataFrame,
anotherDf: DataFrame,
joinExpr: String,
threshold: Double = 0.8,
): DataFrame = {
df.show(false)
anotherDf.show(false)
val pipeline = new Pipeline().setStages(Array(
new RegexTokenizer()
.setPattern("")
.setMinTokenLength(1)
.setInputCol(joinExpr)
.setOutputCol("tokens"),
new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
)
)
val model = pipeline.fit(df)
val storedHashed = model.transform(df)
val landedHashed = model.transform(anotherDf)
val commonColumns = df.columns.toSet & anotherDf.columns.toSet
/**
* Converts column name from a data frame to the column of resulting
dataset.
*/
def convertColumn(datasetName: String)(columnName: String): Column = {
val newName =
if (commonColumns.contains(columnName))
s"$datasetName${columnName.capitalize}"
else columnName
col(s"$datasetName.$columnName") as newName
}
val columnsToSelect = df.columns.map(convertColumn("datasetA")) ++
anotherDf.columns.map(convertColumn("datasetB"))
val result = model
.stages
.last
.asInstanceOf[MinHashLSHModel]
.approxSimilarityJoin(storedHashed, landedHashed, threshold,
"confidence")
.select(columnsToSelect.toSeq: _*)
result.show(false)
result
}
{code}
Now consider such simple example:
{code:scala}
val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
similarityJoin(inputDF1, inputDF2, "name", 0.6)
{code}
This example runs with no errors and outputs 3 empty DFs. Let's add
{{distinct}} method to one data frame:
{code:scala}
val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") >
2) as "df1"
val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
similarityJoin(inputDF1, inputDF2, "name", 0.6)
{code}
This example outputs two empty DFs and then fails at {{result.show(false)}}.
Error:
{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user
defined function (LSHModel$$Lambda$3769/0x0000000101804840:
(struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) =>
array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
... many elided
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at
least 1 non zero entry.
at scala.Predef$.require(Predef.scala:281)
at
org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
... many more
{code}
----
Now let's take a look on the example which is close to our application code.
Define some helper functions:
{code:scala}
import org.apache.spark.sql.functions.{transform, to_timestamp}
def process1(df: DataFrame): Unit = {
val companies = df.select($"id", $"name")
val directors = df
.select(explode($"directors"))
.select($"col.name", $"col.id")
.dropDuplicates("id")
val toBeMatched1 = companies
.filter(length($"name") > 2)
.select(
$"name",
$"id" as "sourceLegalEntityId",
)
val toBeMatched2 = directors
.filter(length($"name") > 2)
.select(
$"name",
$"id" as "directorId",
)
similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
}
def process2(df: DataFrame): Unit = {
def process_financials(column: Column): Column = {
transform(
column,
x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")),
)
}
val companies = df.select(
$"id",
$"name",
struct(
process_financials($"financials.balanceSheet") as "balanceSheet",
process_financials($"financials.capitalAndReserves") as
"capitalAndReserves",
) as "financials",
)
val directors = df
.select(explode($"directors"))
.select($"col.name", $"col.id")
.dropDuplicates("id")
val toBeMatched1 = companies
.filter(length($"name") > 2)
.select(
$"name",
$"id" as "sourceLegalEntityId",
)
val toBeMatched2 = directors
.filter(length($"name") > 2)
.select(
$"name",
$"id" as "directorId",
)
similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
}
{code}
Function {{process2}} does the same job as {{process1}}, but also does some
transforms on {{financials}} column before executing similarity join.
Example data frame and its schema:
{code:scala}
import org.apache.spark.sql.types._
val schema = StructType(
Seq(
StructField("id", StringType),
StructField("name", StringType),
StructField(
"directors",
ArrayType(
StructType(Seq(StructField("id", StringType),
StructField("name", StringType))),
containsNull = true,
),
),
StructField(
"financials",
StructType(
Seq(
StructField(
"balanceSheet",
ArrayType(
StructType(Seq(
StructField("date", StringType),
StructField("value", StringType)
)
),
containsNull = true,
),
),
StructField(
"capitalAndReserves",
ArrayType(
StructType(Seq(
StructField("date", StringType),
StructField("value", StringType)
)
),
containsNull = true,
),
),
),
),
),
)
)
val mainDF = (1 to 10)
.toDF("data")
.withColumn("data", lit(null) cast schema)
.select("data.*")
{code}
This code just makes a data frame with 10 rows of null column casted to the
specified schema.
Now let's pass {[mainDF}} to previously defined functions and observe results.
Example 1:
{code:scala}
process1(mainDF)
{code}
Outputs three empty DFs, no errors.
Example 2:
{code:scala}
process1(mainDF.distinct())
{code}
Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user
defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) =>
array<string>).
... many elided
Caused by: java.lang.NullPointerException
at
org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
... many more
{code}
Example 3:
{code:scala}
process2(mainDF)
{code}
Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
{code:none}
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user
defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) =>
array<string>).
... many elided
Caused by: java.lang.NullPointerException
at
org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
... many more
{code}
Somehow presence of {{distinct}} DF method or {{transform}} (or
{{to_timestamp}}) SQL function before executing similarity join causes it to
fail on empty input data frames. If these operations are done after join, then
no errors are emitted.
---
Current workaround: call {{distinct}} DF method and {{transform}} SQL function
after similarity join.
> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML
> features caused by certain SQL functions
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
> Issue Type: Bug
> Components: ML, SQL
> Affects Versions: 3.3.1, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples was executed inside Zeppelin 0.10.1; Spark 3.3.1 deployed on
> cluster was used to check the issue with real data.
> Reporter: Svyatoslav Semenyuk
> Priority: Major
> Labels: ml, sql
>
> We designed a function that joins two DFs on common column with some
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram,
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
> * Joins two data frames on a string column using LSH algorithm
> * for similarity computation.
> *
> * If input data frames have columns with identical names,
> * the resulting dataframe will have columns from them both
> * with prefixes `datasetA` and `datasetB` respectively.
> *
> * For example, if both dataframes have a column with name `myColumn`,
> * then the result will have columns `datasetAMyColumn` and
> `datasetBMyColumn`.
> */
> def similarityJoin(
> df: DataFrame,
> anotherDf: DataFrame,
> joinExpr: String,
> threshold: Double = 0.8,
> ): DataFrame = {
> df.show(false)
> anotherDf.show(false)
> val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
> .setPattern("")
> .setMinTokenLength(1)
> .setInputCol(joinExpr)
> .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
> )
> )
> val model = pipeline.fit(df)
> val storedHashed = model.transform(df)
> val landedHashed = model.transform(anotherDf)
> val commonColumns = df.columns.toSet & anotherDf.columns.toSet
> /**
> * Converts column name from a data frame to the column of resulting
> dataset.
> */
> def convertColumn(datasetName: String)(columnName: String): Column = {
> val newName =
> if (commonColumns.contains(columnName))
> s"$datasetName${columnName.capitalize}"
> else columnName
> col(s"$datasetName.$columnName") as newName
> }
> val columnsToSelect = df.columns.map(convertColumn("datasetA")) ++
> anotherDf.columns.map(convertColumn("datasetB"))
> val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold,
> "confidence")
> .select(columnsToSelect.toSeq: _*)
> result.show(false)
> result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") >
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}.
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user
> defined function (LSHModel$$Lambda$3769/0x0000000101804840:
> (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) =>
> array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>).
> ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have
> at least 1 non zero entry.
> at scala.Predef$.require(Predef.scala:281)
> at
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
> at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
> ... many more
> {code}
> ----
> Now let's take a look on the example which is close to our application code.
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions.{transform, to_timestamp}
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> val directors = df
> .select(explode($"directors"))
> .select($"col.name", $"col.id")
> .dropDuplicates("id")
> val toBeMatched1 = companies
> .filter(length($"name") > 2)
> .select(
> $"name",
> $"id" as "sourceLegalEntityId",
> )
> val toBeMatched2 = directors
> .filter(length($"name") > 2)
> .select(
> $"name",
> $"id" as "directorId",
> )
> similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
> }
> def process2(df: DataFrame): Unit = {
> def process_financials(column: Column): Column = {
> transform(
> column,
> x => x.withField("date", to_timestamp(x("date"), "dd MMM yyyy")),
> )
> }
> val companies = df.select(
> $"id",
> $"name",
> struct(
> process_financials($"financials.balanceSheet") as "balanceSheet",
> process_financials($"financials.capitalAndReserves") as
> "capitalAndReserves",
> ) as "financials",
> )
> val directors = df
> .select(explode($"directors"))
> .select($"col.name", $"col.id")
> .dropDuplicates("id")
> val toBeMatched1 = companies
> .filter(length($"name") > 2)
> .select(
> $"name",
> $"id" as "sourceLegalEntityId",
> )
> val toBeMatched2 = directors
> .filter(length($"name") > 2)
> .select(
> $"name",
> $"id" as "directorId",
> )
> similarityJoin(toBeMatched1, toBeMatched2, "name", 0.6)
> }
> {code}
> Function {{process2}} does the same job as {{process1}}, but also does some
> transforms on {{financials}} column before executing similarity join.
> Example data frame and its schema:
> {code:scala}
> import org.apache.spark.sql.types._
> val schema = StructType(
> Seq(
> StructField("id", StringType),
> StructField("name", StringType),
> StructField(
> "directors",
> ArrayType(
> StructType(Seq(StructField("id", StringType),
> StructField("name", StringType))),
> containsNull = true,
> ),
> ),
> StructField(
> "financials",
> StructType(
> Seq(
> StructField(
> "balanceSheet",
> ArrayType(
> StructType(Seq(
> StructField("date", StringType),
> StructField("value", StringType),
> )
> ),
> containsNull = true,
> ),
> ),
> StructField(
> "capitalAndReserves",
> ArrayType(
> StructType(Seq(
> StructField("date", StringType),
> StructField("value", StringType),
> )
> ),
> containsNull = true,
> ),
> ),
> ),
> ),
> ),
> )
> )
> val mainDF = (1 to 10)
> .toDF("data")
> .withColumn("data", lit(null) cast schema)
> .select("data.*")
> {code}
> This code just makes a data frame with 10 rows of null column casted to the
> specified schema.
> Now let's pass {[mainDF}} to previously defined functions and observe results.
> Example 1:
> {code:scala}
> process1(mainDF)
> {code}
> Outputs three empty DFs, no errors.
> Example 2:
> {code:scala}
> process1(mainDF.distinct())
> {code}
> Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user
> defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) =>
> array<string>).
> ... many elided
> Caused by: java.lang.NullPointerException
> at
> org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
> ... many more
> {code}
> Example 3:
> {code:scala}
> process2(mainDF)
> {code}
> Outputs two empty DFs and then fails at {{result.show(false)}}. Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user
> defined function (RegexTokenizer$$Lambda$3266/0x0000000101620040: (string) =>
> array<string>).
> ... many elided
> Caused by: java.lang.NullPointerException
> at
> org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
> ... many more
> {code}
> Somehow presence of {{distinct}} DF method or {{transform}} (or
> {{to_timestamp}}) SQL function before executing similarity join causes it to
> fail on empty input data frames. If these operations are done after join,
> then no errors are emitted.
> ---
> Current workaround: call {{distinct}} DF method and {{transform}} SQL
> function after similarity join.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]