This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c30b529 [SPARK-27776][SQL] Avoid duplicate Java reflection in
DataSource.
c30b529 is described below
commit c30b5297bc607ae33cc2fcf624b127942154e559
Author: gengjiaan <[email protected]>
AuthorDate: Tue May 28 09:26:06 2019 -0500
[SPARK-27776][SQL] Avoid duplicate Java reflection in DataSource.
## What changes were proposed in this pull request?
I checked the code of
`org.apache.spark.sql.execution.datasources.DataSource`
, there exists duplicate Java reflection.
`sourceSchema`,`createSource`,`createSink`,`resolveRelation`,`writeAndRead`,
all the methods call the `providingClass.getConstructor().newInstance()`.
The instance of `providingClass` is stateless, such as:
`KafkaSourceProvider`
`RateSourceProvider`
`TextSocketSourceProvider`
`JdbcRelationProvider`
`ConsoleSinkProvider`
AFAIK, Java reflection will result in significant performance issue.
The oracle website
[https://docs.oracle.com/javase/tutorial/reflect/index.html](https://docs.oracle.com/javase/tutorial/reflect/index.html)
contains some performance description about Java reflection:
```
Performance Overhead
Because reflection involves types that are dynamically resolved, certain
Java virtual machine optimizations can not be performed. Consequently,
reflective operations have slower performance than their non-reflective
counterparts, and should be avoided in sections of code which are called
frequently in performance-sensitive applications.
```
I have found some performance cost test of Java reflection as follows:
[https://blog.frankel.ch/performance-cost-of-reflection/](https://blog.frankel.ch/performance-cost-of-reflection/)
contains performance cost test.
[https://stackoverflow.com/questions/435553/java-reflection-performance](https://stackoverflow.com/questions/435553/java-reflection-performance)
has a discussion of java reflection.
So I think should avoid duplicate Java reflection and reuse the instance of
`providingClass`.
## How was this patch tested?
Exists UT.
Closes #24647 from beliefer/optimize-DataSource.
Authored-by: gengjiaan <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../spark/sql/execution/datasources/DataSource.scala | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ef430f4..04ae528 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -105,6 +105,9 @@ case class DataSource(
case _ => cls
}
}
+
+ private def providingInstance() =
providingClass.getConstructor().newInstance()
+
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
@@ -210,7 +213,7 @@ case class DataSource(
/** Returns the name and schema of the source that can be used to
continually read data. */
private def sourceSchema(): SourceInfo = {
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case s: StreamSourceProvider =>
val (name, schema) = s.sourceSchema(
sparkSession.sqlContext, userSpecifiedSchema, className,
caseInsensitiveOptions)
@@ -264,7 +267,7 @@ case class DataSource(
/** Returns a source that can be used to continually read data. */
def createSource(metadataPath: String): Source = {
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case s: StreamSourceProvider =>
s.createSource(
sparkSession.sqlContext,
@@ -293,7 +296,7 @@ case class DataSource(
/** Returns a sink that can be used to continually write data. */
def createSink(outputMode: OutputMode): Sink = {
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case s: StreamSinkProvider =>
s.createSink(sparkSession.sqlContext, caseInsensitiveOptions,
partitionColumns, outputMode)
@@ -324,7 +327,7 @@ case class DataSource(
* that files already exist, we don't need to check
them again.
*/
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
- val relation = (providingClass.getConstructor().newInstance(),
userSpecifiedSchema) match {
+ val relation = (providingInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
dataSource.createRelation(sparkSession.sqlContext,
caseInsensitiveOptions, schema)
@@ -495,7 +498,7 @@ case class DataSource(
throw new AnalysisException("Cannot save interval data type into
external storage.")
}
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(
sparkSession.sqlContext, mode, caseInsensitiveOptions,
Dataset.ofRows(sparkSession, data))
@@ -532,7 +535,7 @@ case class DataSource(
throw new AnalysisException("Cannot save interval data type into
external storage.")
}
- providingClass.getConstructor().newInstance() match {
+ providingInstance() match {
case dataSource: CreatableRelationProvider =>
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions,
mode)
case format: FileFormat =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]