Stuart Reynolds created SPARK-20846:
---------------------------------------

             Summary: Incorrect posgres sql schema inferred from table.
                 Key: SPARK-20846
                 URL: https://issues.apache.org/jira/browse/SPARK-20846
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.1.0
            Reporter: Stuart Reynolds


When reading a table containing int[][] columns from postgres, the column is 
inferred as int[] (should be int[][]).

    from pyspark.sql import SQLContext
    import pandas as pd
    from dataIngest.util.sqlUtil import asSQLAlchemyEngine


    user,password = ..., ...
    url = "postgresql://hostname:5432/dbname"
    url = 'jdbc:'+url
    properties = {'user': user, 'password': password}
    engine = ... sql alchemy engine ...

    # Create pandas df with int[] and int[][]
    df = pd.DataFrame({
        'a1': [[1,2,None],[1,2,3], None],
        'b2':  [[[1],[None],[3]], [[1],[2],[3]], None]
    })

    # Store df into postgres as table _dfjunk
    with engine.connect().execution_options(autocommit=True) as con:
        con.execute("""
        DROP TABLE IF EXISTS _dfjunk;
        
        CREATE TABLE _dfjunk (
          a1 int[] NULL,
          b2 int[][] NULL
        );
        """)
        df.to_sql("_dfjunk", con, index=None, if_exists="append")

    # Let's access via spark
    sc = get_spark_context(master="local")
    sqlContext = SQLContext(sc)

    print "pandas DF as spark DF:"
    df = sqlContext.createDataFrame(df)
    df.printSchema()
    df.show()
    df.registerTempTable("df")
    print sqlContext.sql("select * from df").collect()

    # Export _dfjunk as table df3
    df3 = sqlContext.read.format("jdbc"). \
        option("url", url). \
        option("driver", "org.postgresql.Driver"). \
        option("useUnicode", "true"). \
        option("continueBatchOnError","true"). \
        option("useSSL", "false"). \
        option("user", user). \
        option("password", password). \
        option("dbtable", "_dfjunk").\
        load()
    df3.registerTempTable("df3")

    print "DF inferred from postgres:"
    df3.printSchema()
    df3.show()

    print "DF queried from postgres:"
    df3 = sqlContext.sql("select * from df3")
    df3.printSchema()
    df3.show()
    print df3.collect()


Errors out with:

pandas DF as spark DF:
root
 |-- a1: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- b2: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: long (containsNull = true)  <<< ****** THIS IS CORRECT 
!!!!

+------------+--------------------+
|          a1|                  b2|
+------------+--------------------+
|[1, 2, null]|[WrappedArray(1),...|
|   [1, 2, 3]|[WrappedArray(1),...|
|        null|                null|
+------------+--------------------+

[Row(a1=[1, 2, None], b2=[[1], [None], [3]]), Row(a1=[1, 2, 3], b2=[[1], [2], 
[3]]), Row(a1=None, b2=None)]
DF inferred from postgres:
root
 |-- a1: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- b2: array (nullable = true)
 |    |-- element: integer (containsNull = true)    <<< ****** THIS IS 
WRONG!!!! Is an array of arrays.

17/05/22 15:00:39 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.ClassCastException: [Ljava.lang.Integer; cannot be cast to 
java.lang.Integer
        at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
        at 
org.apache.spark.sql.catalyst.util.GenericArrayData.getInt(GenericArrayData.scala:62)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to