I am using Spark via Java for a MYSQL/ML(machine learning) project.

In the mysql database, I have a column "status_change_type" of type enum =
{broke, fixed} in a table called "status_change" in a DB called "test".

I have an object StatusChangeDB that constructs the needed structure for the
table, however for the "status_change_type", I constructed it as a String. I
know the bytes from MYSQL enum to Java string are much different, but I am
using Spark, so the encoder does not recognize enums properly. However when
I try to set the value of the enum via a Java string, I receive the "data
truncated" error

    org.apache.spark.SparkException: Job aborted due to stage failure: Task
0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage
4.0 (TID 9, localhost, executor driver): java.sql.BatchUpdateException: Data
truncated for column 'status_change_type' at row 1 at
com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:2055)


I have tried to use enum for "status_change_type", however it fails with a
stack trace of

   
 Exception in thread "AWT-EventQueue-0" java.lang.NullPointerException at
org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465) at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at
org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:127)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at
org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:127)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) ... ...


I have tried to use the jdbc setting "jdbcCompliantTruncation=false" but
this does nothing as I get the same error of "data truncated" as first
stated. Here are my jdbc options map, in case I am using the
"jdbcCompliantTruncation=false" incorrectly.

public static Map<String, String> jdbcOptions() {
    Map<String, String> jdbcOptions = new HashMap<String, String>();
    jdbcOptions.put("url",
"jdbc:mysql://localhost:3306/test?jdbcCompliantTruncation=false");
    jdbcOptions.put("driver", "com.mysql.jdbc.Driver");
    jdbcOptions.put("dbtable", "status_change");
    jdbcOptions.put("user", "root");
    jdbcOptions.put("password", "");
    return jdbcOptions;
}

Here is the Spark method for inserting into the mysql DB

private void insertMYSQLQuery(Dataset<Row> changeDF) {
    try {
       
changeDF.write().mode(SaveMode.Append).jdbc(SparkManager.jdbcAppendOptions(),
"status_change",
                new java.util.Properties());
    } catch (Exception e) {
        System.out.println(e);
    }
}

where jdbcAppendOptions uses the jdbcOptions methods as:

public static String jdbcAppendOptions() {

    return SparkManager.jdbcOptions().get("url") + "&user=" +
SparkManager.jdbcOptions().get("user") + "&password="
            + SparkManager.jdbcOptions().get("password");

}

How do I achieve getting the values of type enum into the mysqlDB using
spark, or avoiding this "data truncated" error?

My only other thought would be to change the DB itself to use VARCHAR, but
the project leader is not to happy with the idea.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Encoder-with-mysql-Enum-and-data-truncated-Error-tp28797.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to