Vijay created BAHIR-301: --------------------------- Summary: Unable to persist Oracle's Number(16,9) using sql-streaming-jdbc Key: BAHIR-301 URL: https://issues.apache.org/jira/browse/BAHIR-301 Project: Bahir Issue Type: Bug Components: Spark Streaming Connectors Affects Versions: Spark-2.4.0 Reporter: Vijay Fix For: Spark-2.4.0
While trying to persist Oracle's Number(16,9) using sql-streaming-jdbc, getting below error. +*Java Code Snippet:*+ {{Dataset<Row> rawData = spark.readStream()}} {{ .format("csv").schema(getSchema())}} {{ .csv("/rates-streaming/*.csv");}} {{ }} {{ rawData.createOrReplaceTempView("testRates");}} {{ }} {{ rawData.printSchema();}} {{ results.printSchema();}} {{ }} {{ Dataset<Row> results = spark.sql("select rate from testRates ");}} {{ }} {{ StreamingQuery query = results.writeStream().outputMode("append").format("streaming-jdbc")}} {{ .outputMode(OutputMode.Append()).option(JDBCOptions.JDBC_URL(), datasource)}} {{ .option(JDBCOptions.JDBC_TABLE_NAME(), "TABLENAME")}} {{ .option(JDBCOptions.JDBC_DRIVER_CLASS(), "oracle.jdbc.driver.OracleDriver")}} {{ .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE(), "100").option("user", username)}} {{ .option("password", password).option("truncate", false).trigger(Trigger.ProcessingTime("15 seconds")).start();}} {{ query.awaitTermination();}} {{ }} {{ }} {{ }} {{ public StructType getSchema() {}} {{ List<StructField> fields = new ArrayList<>();}} {{ fields.add(DataTypes.createStructField("rate", DataTypes.createDecimalType(16, 9), true));}} {{ StructType schema = DataTypes.createStructType(fields);}} {{ return schema;}} {{ }}} +*Logs:*+ {{java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast to java.math.BigDecimal}} {{ at org.apache.spark.sql.Row.getDecimal(Row.scala:262) ~[spark-catalyst_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.sql.Row.getDecimal$(Row.scala:262) ~[spark-catalyst_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.sql.catalyst.expressions.GenericRow.getDecimal(rows.scala:166) ~[spark-catalyst_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12(JdbcUtil.scala:102) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12$adapted(JdbcUtil.scala:101) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.doWriteAndResetBuffer(JdbcStreamWriter.scala:174) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:156) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:79) ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118) ~[spark-sql_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) ~[spark-core_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116) ~[spark-sql_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67) ~[spark-sql_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.scheduler.Task.run(Task.scala:121) ~[spark-core_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405) ~[spark-core_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) ~[spark-core_2.12-2.4.0.jar:2.4.0]}} {{ at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) ~[spark-core_2.12-2.4.0.jar:2.4.0]}} {{ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_291]}} {{ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_291]}} {{ at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]}} -- This message was sent by Atlassian Jira (v8.20.1#820001)