wulinzhai opened a new issue, #5119: URL: https://github.com/apache/seatunnel/issues/5119
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 配置文件中partition_dir_expression = "${k1}=${v1}"的k1, 和v1 好像无法从命令行读取到 ### SeaTunnel Version 2.3.2 ### SeaTunnel Config ```conf env { execution.parallelism = 1 job.mode = "BATCH" } source { Amazondynamodb { url = "https://dynamodb.us-east-1.amazonaws.com" region = "us-east-1" access_key_id = "" secret_access_key = "" table = "AuthCode" schema = { fields { code = string } } result_table_name = "source_table" } } transform { FieldMapper { source_table_name = "source_table" result_table_name = "transform_table" field_mapper = { code=code } } } sink { S3File { bucket = "s3a://wonder-data-warehouse" tmp_path = "/tmp/seatunnel" path="/result/auth_code/" fs.s3a.endpoint="s3.us-east-1.amazonaws.com" fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" access_key = "" secret_key = "" file_format_type = "parquet" have_partition = true partition_by = ["input_date"] partition_dir_expression = "${k1}=${v1}" is_partition_field_write_in_file = true sink_columns = ["code", "input_date"] is_enable_transaction=true compress_codec = "snappy" hadoop_s3_properties { "fs.s3a.buffer.dir" = "/mnt/module/seatunnel-2.3.2/buffer" "fs.s3a.fast.upload.buffer" = "disk" } } } ``` ### Running Command ```shell ../bin/start-seatunnel-spark-3-connector-v2.sh -m 'local[*]' -e client --config ddb-to-s3.template -i k1=input_date -i v1=2023-06-01 ``` ### Error Exception ```log Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:354) ... 55 more Suppressed: java.lang.RuntimeException: SinkAggregatedCommitter abort failed in driver at org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite.abort(SeaTunnelBatchWrite.java:76) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:376) ... 55 more Caused by: java.lang.NullPointerException at org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite.lambda$combineCommitMessage$0(SeaTunnelBatchWrite.java:102) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite.combineCommitMessage(SeaTunnelBatchWrite.java:104) at org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite.abort(SeaTunnelBatchWrite.java:74) ... 56 more Caused by: java.lang.NullPointerException at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.generatorPartitionDir(AbstractWriteStrategy.java:216) at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy.getOrCreateFilePathBeingWritten(AbstractWriteStrategy.java:367) at org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy.write(ParquetWriteStrategy.java:95) at org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter.write(BaseFileSinkWriter.java:126) at org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter.write(BaseFileSinkWriter.java:43) at org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkDataWriter.write(SeaTunnelSparkDataWriter.java:59) at org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkDataWriter.write(SeaTunnelSparkDataWriter.java:37) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:419) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1508) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:133) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 23/07/19 11:25:04 ERROR SeaTunnel: =============================================================================== 23/07/19 11:25:04 ERROR SeaTunnel: Fatal Error, 23/07/19 11:25:04 ERROR SeaTunnel: Please submit bug report in https://github.com/apache/seatunnel/issues 23/07/19 11:25:04 ERROR SeaTunnel: Reason:Writing job failed. 23/07/19 11:25:04 ERROR SeaTunnel: Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Writing job failed. at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:63) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 23/07/19 11:25:04 ERROR SeaTunnel: =============================================================================== Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: Writing job failed. at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:63) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 23/07/19 11:25:04 INFO SparkContext: Invoking stop() from shutdown hook 23/07/19 11:25:04 INFO AbstractConnector: Stopped Spark@c6e0f32{HTTP/1.1, (http/1.1)}{0.0.0.0:4040} 23/07/19 11:25:04 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-21-61.ec2.internal:4040 23/07/19 11:25:04 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 23/07/19 11:25:04 INFO MemoryStore: MemoryStore cleared 23/07/19 11:25:04 INFO BlockManager: BlockManager stopped 23/07/19 11:25:04 INFO BlockManagerMaster: BlockManagerMaster stopped 23/07/19 11:25:04 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 23/07/19 11:25:04 INFO SparkContext: Successfully stopped SparkContext 23/07/19 11:25:04 INFO ShutdownHookManager: Shutdown hook called 23/07/19 11:25:04 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-7aa693d5-1a4c-4466-aa9c-88ee828095cd 23/07/19 11:25:04 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-0ea3924d-f35c-4600-9dfc-e48afd3999b5 23/07/19 11:25:05 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system... 23/07/19 11:25:05 INFO MetricsSystemImpl: s3a-file-system metrics system stopped. 23/07/19 11:25:05 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete. ``` ### Flink or Spark Version spark 3.2.1 ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
