Dan Burkert created KUDU-2518:
---------------------------------

             Summary: SparkSQL queries without temporary tables
                 Key: KUDU-2518
                 URL: https://issues.apache.org/jira/browse/KUDU-2518
             Project: Kudu
          Issue Type: Improvement
          Components: hms, spark
    Affects Versions: 1.7.1
            Reporter: Dan Burkert


One long-standing ergonomic issue with the Kudu/SparkSQL integration is the 
requirement to register Kudu tables as temp tables before they can be scanned 
using a SQL string ({{sql("SELECT * FROM my_kudu_table")}}).  Ideally SparkSQL 
could query Kudu tables that it discovers via the HMS with no additional 
configuration.  Yesterday I explored what it would take to get there, and I 
found some interesting things.

 

If the HMS table contains a {{spark.sql.sources.provider}} table property with 
a value like {{org.apache.kudu.spark.kudu.DefaultSource}}, SparkSQL will 
automatically instantiate the corresponding {{RelationProvider}} class, passing 
a {{SQLContext}} and a map of parameters, which it fills in with the table's 
HDFS URI, and storage properties.  The current plan for Kudu + HMS integration 
(KUDU-2191) is not to set any storage properties, instead attributes like 
master addresses and table ID will be stored as table properties.  As a result, 
SparkSQL is instantiating a Kudu {{DefaultSource}}, but it doesn't pass 
necessary arguments like the table name or master addresses.   Getting this far 
required adding a dummy {{org.apache.kudu.hive.KuduStorageHandler}} class to 
the classpath so that the Hive client wouldn't choke on the bogus class name.  
The stacktrace from Spark attempting to instantiate the {{DefaultSource}} is 
provided below.

 
{code:java}
Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
Spark context available as 'sc' (master = local[*], app id = 
local-1532719985143).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sql("DESCRIBE TABLE t1")
org.spark_project.guava.util.concurrent.UncheckedExecutionException: 
java.lang.IllegalArgumentException: Kudu table name must be specified in create 
options using key 'kudu.table'.  parameters: Map(), parameters-size: 0, 
parameters-keys: Set(), path: None
  at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2263)
  at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
  at 
org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
  at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
  at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
  at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
  at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
  at 
org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
  ... 49 elided
Caused by: java.lang.IllegalArgumentException: Kudu table name must be 
specified in create options using key 'kudu.table'.  parameters: Map(), 
parameters-size: 0, parameters-keys: Set(), path: None
  at 
org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
  at 
org.apache.kudu.spark.kudu.DefaultSource$$anonfun$11.apply(DefaultSource.scala:82)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at 
org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28)
  at 
org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:81)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
  at 
org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
  at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
  at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
  at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
  at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
  ... 96 more

scala>{code}
 

After striking out with the existing interfaces I looked at the 
{{DataSourceRegister}} API which is a part of the {{DataSourceV2}} effort 
underway in Spark.  It's not clear that this API actually provides more context 
when creating relations (we need table name and master addresses from the table 
properties and options are still just passed as a map in 
{{DataSourceOptions}}), but more significantly it doesn't appear that the 
{{spark.sql.sources.provider}} property works correctly with {{DataSourceV2}} 
instances, it gives a class cast issue:

 
{code:java}
Spark context Web UI available at http://kudu-hms-1.gce.cloudera.com:4041
Spark context available as 'sc' (master = local[*], app id = 
local-1532720634224).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sql("DESCRIBE TABLE t1")
org.apache.spark.sql.AnalysisException: org.apache.kudu.spark.KuduDataSource is 
not a valid Spark SQL Data Source.;
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:415)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:242)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anon$1.call(DataSourceStrategy.scala:227)
  at 
org.spark_project.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
  at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
  at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
  at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
  at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
  at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
  at 
org.spark_project.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
  at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:137)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:227)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:264)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:255)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:255)
  at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:223)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
  at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103)
  at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
  at org.apache.spark.sql.SparkSession.table(SparkSession.scala:627)
  at 
org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:548)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
  ... 49 elided

scala>{code}
 

{{org.apache.kudu.spark.KuduDataSource}} is a dummy class I put on the 
classpath and added to the Hive metastore table attribute:

 
{code:java}
class KuduDataSource extends DataSourceV2
with DataSourceRegister
with ReadSupport
{
  override def shortName(): String = "kudu"

  override def createReader(options: DataSourceOptions): DataSourceReader = {
    new KuduDataSourceReader(options)
  }
}

class KuduDataSourceReader(val options: DataSourceOptions) extends 
DataSourceReader {

  override def readSchema(): StructType = ???

  override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = 
???
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to