Hello,

I have been working on a small ETL framework for pyspark/delta/databricks on my spare time.

It looks like I might have encountered a bug, however I'm not totally sure its actually caused by spark itself and not one of the other technologies.

The error shows up when using spark sql to compare a incoming data frame from jdbc/mssql with a empty delta table. Spark sends a query to mssql ending in 'WHERE (1)', which apparently is invalid syntax and causes exception to be thrown. Unless reading in parallel no where clause at all should be needed as the code is reading all rows from the source.

The error does not happen on databricks 10.4 LTS with spark 3.2.1 but from databricks 11.3 LTS with spark 3.3.0 and beyond it shows up. The error also does not happen with postgresql or mysql, so either the resulting sql is valid there or does not contain the extra 'WHERE (1)'.

I have provided some sample pyspark code below that can be used to reproduce the error on databricks community edition and a mssql server. I have written 2 different versions of the sql statment. Both versions result in the same error.

If there is some option or other trick that can be used to circumvent the error on newer releases I would be grateful to learn about it. However being able use a single sql statement for this is preferable to try to keep it short, idempotent and atomic.

Best regards
Lennart


Code:

# Add _checksum column to beginning of data frame.
def add_checksum_column(df):
    from pyspark.sql.functions import concat_ws, md5
return df.select([md5(concat_ws("<|^|>", *sorted(df.columns))).alias("_checksum"), "*"])

# Add _key column to beginning of data frame.
def add_key_column(df, key):
    if isinstance(key, str):
        df = df.select([df[key].alias("_key"), "*"])
    elif isinstance(key, list):
        from pyspark.sql.functions import concat_ws
        df = df.select([concat_ws("-", *key).alias("_key"), "*"])
    else:
        raise Exception("Invalid key")
    return df

# Create Delta table.
def create_table(df, key, target):
    if spark.catalog._jcatalog.tableExists(target):
        return
    from pyspark.sql.functions import current_timestamp, lit
df = add_checksum_column(df) # (4) _checksum df = add_key_column(df, key) # (3) _key df = df.select([current_timestamp().alias("_timestamp"), "*"]) # (2) _timestamp df = df.select([lit("I").alias("_operation"), "*"]) # (1) _operation
    df.filter("1=0").write.format("delta") \
        .option("delta.autoOptimize.optimizeWrite", "true") \
        .option("delta.autoOptimize.autoCompact", "true") \
        .saveAsTable(target)

# Capture inserted and updated records from full or partial source data frame.
def insert_update(df, key, target, query):
    # Prepare source view.
    df = add_checksum_column(df)
    df = add_key_column(df, key)
    df.createOrReplaceTempView("s")
    # Prepare target view.
    spark.table(target).createOrReplaceTempView("t")
    # Insert records.
    return spark.sql(query)

query1 = """
INSERT INTO t
SELECT CASE WHEN a._key IS NULL THEN "I" ELSE "U" END AS _operation, CURRENT_TIMESTAMP AS _timestamp, s.*
FROM s
LEFT JOIN
(
    SELECT t._key, t._checksum
    FROM t
INNER JOIN (SELECT _key, MAX(_timestamp) AS m FROM t GROUP BY _key) AS m
    ON t._key = m._key AND t._timestamp = m.m
    WHERE t._operation <> "D"
)
AS a
ON s._key = a._key
WHERE (a._key IS NULL)          -- Insert
OR (s._checksum <> a._checksum) -- Update
"""

query2 = """
INSERT INTO t
SELECT CASE WHEN a._key IS NULL THEN "I" ELSE "U" END AS _operation, CURRENT_TIMESTAMP AS _timestamp, s.*
FROM s
LEFT JOIN
(
SELECT _key, _checksum, ROW_NUMBER() OVER (PARTITION BY _key ORDER BY _timestamp DESC) AS rn
    FROM t
    WHERE _operation <> "D"
)
AS a
ON s._key = a._key AND a.rn = 1
WHERE (a._key IS NULL)          -- Insert
OR (s._checksum <> a._checksum) -- Update
"""

host     = "mssql.test.com"
port     = "1433"
database = "test"
username = "test"
password = "test"
table    = "test"
key      = "test_id"
target   = "archive.test"

df = spark.read.jdbc(
properties = {"driver":"com.microsoft.sqlserver.jdbc.SQLServerDriver", "user":username, "password":password},
    url = f"jdbc:sqlserver://{host}:{port};databaseName={database}",
    table = table
)

create_table(df=df, key=key, target=target)
insert_update(df=df, key=key, target=target, query=query1)



---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to