Wei Liu created SPARK-43796:
-------------------------------
Summary: Streaming ForeachWriter can't accept custom user defined
class
Key: SPARK-43796
URL: https://issues.apache.org/jira/browse/SPARK-43796
Project: Spark
Issue Type: Bug
Components: Connect, Structured Streaming
Affects Versions: 3.5.0
Reporter: Wei Liu
[https://github.com/apache/spark/pull/41129]
The last example in the PR description doesn't work with current REPL
implementation.
Code:
{code:java}
import org.apache.spark.sql.{ForeachWriter, Row}
import java.io._
val filePath = "/home/wei.liu/test_foreach/output-custom"
case class MyTestClass(value: Int) {
override def toString: String = value.toString
}
val writer = new ForeachWriter[MyTestClass] {
var fileWriter: FileWriter = _
def open(partitionId: Long, version: Long): Boolean = {
fileWriter = new FileWriter(filePath, true)
true
}
def process(row: MyTestClass): Unit = {
fileWriter.write(row.toString)
fileWriter.write("\n")
}
def close(errorOrNull: Throwable): Unit = {
fileWriter.close()
}
}
val df = spark.readStream .format("rate") .option("rowsPerSecond", "10") .load()
val query = df .selectExpr("CAST(value AS INT)") .as[MyTestClass] .writeStream
.foreach(writer) .outputMode("update") .start()
{code}
Error:
{code:java}
23/05/24 19:17:31 ERROR Utils: Aborting task
java.lang.NoClassDefFoundError: Could not initialize class ammonite.$sess.cmd4$
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:35)
at
org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:30)
at org.apache.spark.util.Utils$.classForName(Utils.scala:94)
at
org.apache.spark.sql.catalyst.encoders.OuterScopes$.$anonfun$getOuterScope$1(OuterScopes.scala:59)
at
org.apache.spark.sql.catalyst.expressions.objects.NewInstance.$anonfun$doGenCode$1(objects.scala:598)
at scala.Option.map(Option.scala:230)
at
org.apache.spark.sql.catalyst.expressions.objects.NewInstance.doGenCode(objects.scala:598)
at
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:201)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.$anonfun$create$1(GenerateSafeProjection.scala:156)
at scala.collection.immutable.List.map(List.scala:293)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:153)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:39)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1369)
at
org.apache.spark.sql.catalyst.expressions.SafeProjection$.createCodeGeneratedObject(Projection.scala:171)
at
org.apache.spark.sql.catalyst.expressions.SafeProjection$.createCodeGeneratedObject(Projection.scala:168)
at
org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
at
org.apache.spark.sql.catalyst.expressions.SafeProjection$.create(Projection.scala:194)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:173)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:166)
at
org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.write(ForeachWriterTable.scala:147)
at
org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.write(ForeachWriterTable.scala:132)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493)
at
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1521)
at
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
at
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1487)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) {code}
This issue is similar to SPARK-43198
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]