Vijay created BAHIR-301:
---------------------------

             Summary: Unable to persist Oracle's Number(16,9) using 
sql-streaming-jdbc
                 Key: BAHIR-301
                 URL: https://issues.apache.org/jira/browse/BAHIR-301
             Project: Bahir
          Issue Type: Bug
          Components: Spark Streaming Connectors
    Affects Versions: Spark-2.4.0
            Reporter: Vijay
             Fix For: Spark-2.4.0


While trying to persist Oracle's Number(16,9) using sql-streaming-jdbc, getting 
below error.

+*Java Code Snippet:*+

{{Dataset<Row> rawData = spark.readStream()}}
{{                .format("csv").schema(getSchema())}}
{{                .csv("/rates-streaming/*.csv");}}
{{    }}
{{        rawData.createOrReplaceTempView("testRates");}}
{{        }}
{{        rawData.printSchema();}}
{{    results.printSchema();}}
{{    }}
{{    Dataset<Row> results = spark.sql("select rate  from testRates ");}}
{{    }}
{{        StreamingQuery query = 
results.writeStream().outputMode("append").format("streaming-jdbc")}}
{{                
.outputMode(OutputMode.Append()).option(JDBCOptions.JDBC_URL(), datasource)}}
{{                .option(JDBCOptions.JDBC_TABLE_NAME(), "TABLENAME")}}
{{                .option(JDBCOptions.JDBC_DRIVER_CLASS(), 
"oracle.jdbc.driver.OracleDriver")}}
{{                .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE(), 
"100").option("user", username)}}
{{                .option("password", password).option("truncate", 
false).trigger(Trigger.ProcessingTime("15 seconds")).start();}}
{{        query.awaitTermination();}}
{{        }}
{{        }}
{{        }}
{{        public StructType getSchema() {}}
{{        List<StructField> fields = new ArrayList<>();}}
{{        fields.add(DataTypes.createStructField("rate", 
DataTypes.createDecimalType(16, 9), true));}}
{{        StructType schema = DataTypes.createStructType(fields);}}
{{        return schema;}}
{{        }}}

+*Logs:*+

{{java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be 
cast to java.math.BigDecimal}}
{{    at org.apache.spark.sql.Row.getDecimal(Row.scala:262) 
~[spark-catalyst_2.12-2.4.0.jar:2.4.0]}}
{{    at org.apache.spark.sql.Row.getDecimal$(Row.scala:262) 
~[spark-catalyst_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.spark.sql.catalyst.expressions.GenericRow.getDecimal(rows.scala:166) 
~[spark-catalyst_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12(JdbcUtil.scala:102)
 ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12$adapted(JdbcUtil.scala:101)
 ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.doWriteAndResetBuffer(JdbcStreamWriter.scala:174)
 ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:156)
 ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:79)
 ~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118)
 ~[spark-sql_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 ~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
 ~[spark-sql_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
 ~[spark-sql_2.12-2.4.0.jar:2.4.0]}}
{{    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{    at org.apache.spark.scheduler.Task.run(Task.scala:121) 
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
 ~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) 
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[na:1.8.0_291]}}
{{    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[na:1.8.0_291]}}
{{    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to