[
https://issues.apache.org/jira/browse/KUDU-2518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Grant Henke updated KUDU-2518:
------------------------------
Labels: kudu-roadmap (was: )
> SparkSQL queries without temporary tables
> -----------------------------------------
>
> Key: KUDU-2518
> URL: https://issues.apache.org/jira/browse/KUDU-2518
> Project: Kudu
> Issue Type: Improvement
> Components: hms, spark
> Affects Versions: 1.7.1
> Reporter: Dan Burkert
> Priority: Major
> Labels: kudu-roadmap
>
> One long-standing ergonomic issue with the Kudu/SparkSQL integration is the
> requirement to register Kudu tables as temp tables before they can be scanned
> using a SQL string ({{sql("SELECT * FROM my_kudu_table")}}). Ideally
> SparkSQL could query Kudu tables that it discovers via the HMS with no
> additional configuration. Yesterday I explored what it would take to get
> there, and I found some interesting things.
>
> If the HMS table contains a {{spark.sql.sources.provider}} table property
> with a value like {{org.apache.kudu.spark.kudu.DefaultSource}}, SparkSQL will
> automatically instantiate the corresponding {{RelationProvider}} class,
> passing a {{SQLContext}} and a map of parameters, which it fills in with the
> table's HDFS URI, and storage properties. The current plan for Kudu + HMS
> integration (KUDU-2191) is not to set any storage properties, instead
> attributes like master addresses and table ID will be stored as table
> properties. As a result, SparkSQL is instantiating a Kudu {{DefaultSource}},
> but it doesn't pass necessary arguments like the table name or master
> addresses. Getting this far required adding a dummy
> {{org.apache.kudu.hive.KuduStorageHandler}} class to the classpath so that
> the Hive client wouldn't choke on the bogus class name. The stacktrace from
> Spark attempting to instantiate the {{DefaultSource}} is provided below.
>
> {code:java}
> Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
> Spark context available as 'sc' (master = local[*], app id =
> local-1532719985143).
> Spark session available as 'spark'.
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /___/ .__/\_,_/_/ /_/\_\ version 2.3.1
> /_/
>
> Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> sql("DESCRIBE TABLE t1")
> org.spark_project.guava.util.concurrent.UncheckedExecutionException:
> java.lang.IllegalArgumentException: Kudu table name must be specified in
> create options using key 'kudu.table'. parameters: Map(), parameters-size:
> 0, parameters-keys: Set(), path: None
> at
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2263)
> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
> at
> org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
> at
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
> at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
> at scala.collection.immutable.List.foldLeft(List.scala:84)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
> at
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
> at
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
> at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
> at
> org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
> ... 49 elided
> Caused by: java.lang.IllegalArgumentException: Kudu table name must be
> specified in create options using key 'kudu.table'. parameters: Map(),
> parameters-size: 0, parameters-keys: Set(), path: None
> at
> org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
> at
> org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at
> org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28)
> at
> org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:81)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
> at
> org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
> at
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
> at
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
> at
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
> at
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
> ... 96 more
> scala>{code}
>
> After striking out with the existing interfaces I looked at the
> {{DataSourceRegister}} API which is a part of the {{DataSourceV2}} effort
> underway in Spark. It's not clear that this API actually provides more
> context when creating relations (we need table name and master addresses from
> the table properties and options are still just passed as a map in
> {{DataSourceOptions}}), but more significantly it doesn't appear that the
> {{spark.sql.sources.provider}} property works correctly with {{DataSourceV2}}
> instances, it gives a class cast issue:
>
> {code:java}
> Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
> Spark context available as 'sc' (master = local[*], app id =
> local-1532720634224).
> Spark session available as 'spark'.
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /___/ .__/\_,_/_/ /_/\_\ version 2.3.1
> /_/
>
> Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> sql("DESCRIBE TABLE t1")
> org.apache.spark.sql.AnalysisException: org.apache.kudu.spark.KuduDataSource
> is not a valid Spark SQL Data Source.;
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:415)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
> at
> org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
> at
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
> at
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
> at
> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
> at
> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
> at
> org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
> at
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
> at
> org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
> at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
> at scala.collection.immutable.List.foldLeft(List.scala:84)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
> at
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
> at
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
> at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
> at
> org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
> ... 49 elided
> scala>{code}
>
> {{org.apache.kudu.spark.KuduDataSource}} is a dummy class I put on the
> classpath and added to the Hive metastore table attribute:
>
> {code:java}
> class KuduDataSource extends DataSourceV2
> with DataSourceRegister
> with ReadSupport
> {
> override def shortName(): String = "kudu"
> override def createReader(options: DataSourceOptions): DataSourceReader = {
> new KuduDataSourceReader(options)
> }
> }
> class KuduDataSourceReader(val options: DataSourceOptions) extends
> DataSourceReader {
> override def readSchema(): StructType = ???
> override def createDataReaderFactories(): util.List[DataReaderFactory[Row]]
> = ???
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)