Wei Liu created SPARK-43796:
-------------------------------

             Summary: Streaming ForeachWriter can't accept custom user defined 
class
                 Key: SPARK-43796
                 URL: https://issues.apache.org/jira/browse/SPARK-43796
             Project: Spark
          Issue Type: Bug
          Components: Connect, Structured Streaming
    Affects Versions: 3.5.0
            Reporter: Wei Liu


[https://github.com/apache/spark/pull/41129]

The last example in the PR description doesn't work with current REPL 
implementation. 

Code:

 
{code:java}
import org.apache.spark.sql.{ForeachWriter, Row} 
import java.io._ 
val filePath = "/home/wei.liu/test_foreach/output-custom" 
case class MyTestClass(value: Int) {
      override def toString: String = value.toString
}
val writer = new ForeachWriter[MyTestClass] {
    var fileWriter: FileWriter = _
    def open(partitionId: Long, version: Long): Boolean = {
      fileWriter = new FileWriter(filePath, true)
      true
    }
    def process(row: MyTestClass): Unit = {
      fileWriter.write(row.toString)
      fileWriter.write("\n")
    }
    def close(errorOrNull: Throwable): Unit = {
      fileWriter.close()
    }
}
val df = spark.readStream .format("rate") .option("rowsPerSecond", "10") .load()
val query = df .selectExpr("CAST(value AS INT)") .as[MyTestClass] .writeStream 
.foreach(writer) .outputMode("update") .start()
{code}
Error:
{code:java}
23/05/24 19:17:31 ERROR Utils: Aborting task
java.lang.NoClassDefFoundError: Could not initialize class ammonite.$sess.cmd4$
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:35)
        at 
org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:30)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:94)
        at 
org.apache.spark.sql.catalyst.encoders.OuterScopes$.$anonfun$getOuterScope$1(OuterScopes.scala:59)
        at 
org.apache.spark.sql.catalyst.expressions.objects.NewInstance.$anonfun$doGenCode$1(objects.scala:598)
        at scala.Option.map(Option.scala:230)
        at 
org.apache.spark.sql.catalyst.expressions.objects.NewInstance.doGenCode(objects.scala:598)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:201)
        at scala.Option.getOrElse(Option.scala:189)
        at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.$anonfun$create$1(GenerateSafeProjection.scala:156)
        at scala.collection.immutable.List.map(List.scala:293)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:153)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:39)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1369)
        at 
org.apache.spark.sql.catalyst.expressions.SafeProjection$.createCodeGeneratedObject(Projection.scala:171)
        at 
org.apache.spark.sql.catalyst.expressions.SafeProjection$.createCodeGeneratedObject(Projection.scala:168)
        at 
org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
        at 
org.apache.spark.sql.catalyst.expressions.SafeProjection$.create(Projection.scala:194)
        at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:173)
        at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:166)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.write(ForeachWriterTable.scala:147)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.write(ForeachWriterTable.scala:132)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1521)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1487)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) {code}
This issue is similar to SPARK-43198

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to