[
https://issues.apache.org/jira/browse/SPARK-35126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhangrenhua resolved SPARK-35126.
---------------------------------
Resolution: Not A Problem
I found that by setting FetchSize, can respond quickly and return the ResultSet
object. If it is mysql database, you need to add the useCursorFetch=true
parameter in the jdbc url. Thank you for your answer, I will close this pull
request.
> Execute jdbc cancellation method when jdbc load job is interrupted
> ------------------------------------------------------------------
>
> Key: SPARK-35126
> URL: https://issues.apache.org/jira/browse/SPARK-35126
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.1.1
> Environment: Environment version:
> * spark3.1.1
> * jdk1.8.201
> * scala2.12
> * mysql5.7.31
> * mysql-connector-java-5.1.32.jar /mysql-connector-java-8.0.32.jar
> Reporter: zhangrenhua
> Priority: Major
> Original Estimate: 2h
> Remaining Estimate: 2h
>
> I have a long-running spark service that continuously receives and runs spark
> programs submitted by the client. There is a program to load jdbc table.
> Query sql is very complicated. Each execution takes a lot of time and
> resources. When the client submits such a similar request, the client may
> interrupt the job at any time. At that time, I found that the database select
> after the job was interrupted. The process is still executing and has not
> been killed.
>
> *Scene demonstration:*
> 1. Prepare two tables: SPARK_TEST1/SPARK_TEST2, each of which has 1000
> records)
> 2. Test code
> {code:java}
> import org.apache.spark.SparkConf;
> import org.apache.spark.SparkContext;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import java.util.concurrent.TimeUnit;
> /**
> * jdbc load cancel test
> *
> * @author gavin
> * @create 2021/4/18 10:58
> */
> public class JdbcLoadCancelTest {
> public static void main(String[] args) throws Exception {
> final SparkConf sparkConf = new SparkConf();
> sparkConf.setAppName("jdbc load test");
> sparkConf.setMaster("local[*]");
> final SparkContext sparkContext = new SparkContext(sparkConf);
> final SparkSession sparkSession = new SparkSession(sparkContext);
> // This is a sql that takes about a minute to execute
> String querySql = "select t1.*\n" +
> "from SPARK_TEST1 t1\n" +
> "left join SPARK_TEST1 t2 on 1=1\n" +
> "left join (select aa from SPARK_TEST1 limit 3) t3 on 1=1";
> // Specify job information
> final String jobGroup = "test";
> sparkContext.clearJobGroup();
> sparkContext.setJobGroup(jobGroup, "test", true);
> // Start the independent thread to start the jdbc load test logic
> new Thread(() -> {
> final Dataset<Row> table = sparkSession.read()
>
> .format("org.apache.spark.sql.execution.datasources.jdbc3")
> .option("url",
> "jdbc:mysql://192.168.10.226:32320/test?useUnicode=true&characterEncoding=utf-8&useSSL=false")
> .option("user", "root")
> .option("password", "123456")
> .option("query", querySql)
> .load();
> // Print the first data
> System.out.println(table.limit(1).first());
> }).start();
> // Wait for the jdbc load job to start
> TimeUnit.SECONDS.sleep(10);
> // Cancel the job just now
> sparkContext.cancelJobGroup(jobGroup);
> // Simulate a long-running service without stopping the driver
> process, which is used to wait for new jobs to be received
> TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
> }
> }
> {code}
>
> 3. View the mysql process
> {code:java}
> select * from information_schema.`PROCESSLIST` where info is not null;{code}
> When the program started 10 seconds later, and interrupted the job, it was
> found that the database query process has not been killed.
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]