Karen created BEAM-13766:
----------------------------

             Summary: windowed text writer fails with SparkRunner when beam 
jars get upgraded from 2.28 to 2.34
                 Key: BEAM-13766
                 URL: https://issues.apache.org/jira/browse/BEAM-13766
             Project: Beam
          Issue Type: Bug
          Components: beam-community
    Affects Versions: 2.34.0
         Environment: Linux xxxxx 3.10.0-1160.53.1.el7.x86_64 #1 SMP Thu Dec 16 
10:19:28 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux, Java jdk1.8.0_311, Mapr FS
            Reporter: Karen


I've created a java beam pipeline with kafka reader, simple transformer and 
windowed text writer. It worked fine with beam 2.28.0 jars with both direct 
runner and spark runner. However, when upgraded beam to 2.34.0,  without any 
other change,  direct runner still works, spark runner failed with the 
exception. There's no any stateId/timer annotation in my app code, not sure why 
it appeared in translated code. Please help, let me know if you need more 
details. Thanks.

2022-01-26 16:40:00.410 ERROR 4145 --- [           main] 
o.a.spark.deploy.SparkSubmit$$anon$2     : Failed to execute CommandLineRunner

java.lang.IllegalStateException: Failed to execute CommandLineRunner
        at 
org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:807)
 ~[spring-boot-2.4.2.jar:2.4.2]
        at 
org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:788)
 ~[spring-boot-2.4.2.jar:2.4.2]
        at 
org.springframework.boot.SpringApplication.run(SpringApplication.java:333) 
~[spring-boot-2.4.2.jar:2.4.2]
        at 
org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:144)
 ~[spring-boot-2.4.2.jar:2.4.2]
        at com.wellsfargo.dct.beam.DctApplication.main(DctApplication.java:76) 
~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[na:1.8.0_311]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[na:1.8.0_311]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:1.8.0_311]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_311]
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) 
~[spark-core_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:852)
 [spark-core_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) 
[spark-core_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) 
[spark-core_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) 
[spark-core_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:940) 
[spark-core_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:949) 
[spark-core_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
[spark-core_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
Caused by: java.lang.UnsupportedOperationException: Found StateId annotations 
on org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn, but 
DoFn cannot yet be used with state in the SparkRunner.
        at 
org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers(TranslationUtils.java:271)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9.evaluate(StreamingTransformTranslator.java:418)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9.evaluate(StreamingTransformTranslator.java:409)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:449)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:438)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469) 
~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:46)
 ~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:627)
 ~[spark-streaming_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:626)
 ~[spark-streaming_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at scala.Option.getOrElse(Option.scala:121) 
~[scala-library-2.11.12.jar:na]
        at 
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:848)
 ~[spark-streaming_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:626)
 ~[spark-streaming_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
 ~[spark-streaming_2.11-2.4.4.200-mapr-632.jar:2.4.4.200-mapr-632]
        at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:180) 
~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:96) 
~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323) 
~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309) 
~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at com.wellsfargo.dct.beam.DctApplication.run(DctApplication.java:221) 
~[dct-beam-1.0.0-SNAPSHOT-all.jar:na]
        at 
org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804)
 ~[spring-boot-2.4.2.jar:2.4.2]
        ... 16 common frames omitted



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to