*Challenge* Insert data from Spark dataframe when one or more columns in theOracle table rely on some derived_colums dependent on data in one or more dataframe columns.
Standard JDBC from Spark to Oracle does batch insert of dataframe into Oracle *so it cannot handle these derived columns*. Refer below *dataFrame.* \ write. \ format("jdbc"). \ option("url", url of Oracle). \ *option("dbtable", schema.tableName)*. \ option("user", user). \ option("password", password). \ option("driver", Oracle driver). \ mode(mode). \ *save()* This writes the whole content of the dataframe to the Oracle table. Cannot replace schema.tableName with INSERT statement *Possible solution* 1. Need a cursor based solution. Create a cursor from Spark dataframe. So we can walk through every row and get the value of each column from the dataframe 2. Oracle provides the cx_Oracle package. cx_Oracle <https://oracle.github.io/python-cx_Oracle/> is a Python extension module that enables access to Oracle Database. It conforms to the Python database API 2.0 specification <http://www.python.org/topics/database/DatabaseAPI-2.0.html> with a considerable number of additions and a couple of exclusions. It is maintained by Oracle. 3. Using cx_Oracle we should be able to create a Connection type to Oracle and use Connection.cursor() to deal with rows. See below This is an example Create connection to Oracle. Need to install cx_oracle package in PySpark import cx_Oracle def loadIntoOracleTableWithCursor(self, df): # set Oracle details tableName = "randomdata" fullyQualifiedTableName = self.config['OracleVariables']['dbschema']+'.'+tableName user = self.config['OracleVariables']['oracle_user'] password = self.config['OracleVariables']['oracle_password'] serverName = self.config['OracleVariables']['oracleHost'] port = self.config['OracleVariables']['oraclePort'] serviceName = self.config['OracleVariables']['serviceName'] dsn_tns = cx_Oracle.makedsn(serverName, port, service_name=serviceName) # create connection conn conn = cx_Oracle.connect(user, password, dsn_tns) cursor = conn.cursor() # df is the dataframe containing the data. Let us build a cursor on it. for row in df.rdd.collect(): # get individual column values from the dataframe id = row[0] clustered = row[1] scattered = row[2] randomised = row[3] random_string = row[4] small_vc = row[5] padding = row[6] # Build INSERT/SELECT statement to be executed in Oracle. This is what we are sending for every row to the Oracle table. Oracle table has a column called *derived_col *that dataframe does not have it. # That is the one that is derived from some value on the dataframe column(s). For example here I assign *derived_col = cos(id)* and pass it in sqlText. You need {} to pass the value and enclose i single quotes # if the column is character type sqlText = f"""insert into {fullyQualifiedTableName} (id,clustered,scattered,randomised,random_string,small_vc,padding, *derived_col)* values ({id},{clustered},{scattered},{randomised},'{random_string}','{small_vc}','{padding}', *cos({id*}))""" print(sqlText) cursor.execute(sqlText) conn.commit() Our dataframe has 10 rows and id in Oracle table has been made the primary key scratch...@orasource.mich.LOCAL> CREATE TABLE scratchpad.randomdata 2 ( 3 "ID" NUMBER(*,0), 4 "CLUSTERED" NUMBER(*,0), 5 "SCATTERED" NUMBER(*,0), 6 "RANDOMISED" NUMBER(*,0), 7 "RANDOM_STRING" VARCHAR2(50 BYTE), 8 "SMALL_VC" VARCHAR2(50 BYTE), 9 "PADDING" VARCHAR2(4000 BYTE), 10 "DERIVED_COL" FLOAT(126) 11 ); Table created. scratch...@orasource.mich.LOCAL> ALTER TABLE scratchpad.randomdata ADD CONSTRAINT randomdata_PK PRIMARY KEY (ID); Table altered. Run it and see the output of print(sqlText) insert into SCRATCHPAD.randomdata (id,clustered,scattered,randomised,random_string,small_vc,padding,derived_col) values (1,0.0,0.0,2.0,'KZWeqhFWCEPyYngFbyBMWXaSCrUZoLgubbbPIayRnBUbHoWCFJ',' 1','xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',cos(1)) This works fine. It creates the rows and does a commit *What is needed* We need to implement a JDBC connection in Spark such that it handles DML in addition to DQ. JDBC option option("dbtable", schema.tableName) should be enhanced to replace schema.tableName with an equivalent statement to allow DML to go through. *Benefits* This will enable standard JDBC calls from Spark to handle all conditions rather than one bulk insert. view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.