This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new b834c3f83a5 [SPARK-32380][SQL] Fixing access of HBase table via Hive 
from Spark
b834c3f83a5 is described below

commit b834c3f83a5cf49d64ba16076a3016b60c7a48a7
Author: attilapiros <[email protected]>
AuthorDate: Sat Nov 5 21:29:02 2022 +0900

    [SPARK-32380][SQL] Fixing access of HBase table via Hive from Spark
    
    This is an update of https://github.com/apache/spark/pull/29178 which was 
closed because the root cause of the error was just vaguely defined there but 
here I will give an explanation why `HiveHBaseTableInputFormat` does not work 
well with the `NewHadoopRDD` (see in the next section).
    
    The PR modify `TableReader.scala` to create `OldHadoopRDD` when input 
format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.
    
    - environments (Cloudera distribution 7.1.7.SP1):
    hadoop 3.1.1
    hive 3.1.300
    spark 3.2.1
    hbase 2.2.3
    
    With the `NewHadoopRDD` the following exception is raised:
    
    ```
    java.io.IOException: Cannot create a record reader because of a previous 
error. Please look at the previous logs lines from the task's full log for more 
details.
      at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
      at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
      at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
      at scala.Option.getOrElse(Option.scala:189)
      at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
      at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
      at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
      at scala.Option.getOrElse(Option.scala:189)
      at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
      at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
      at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
      at scala.Option.getOrElse(Option.scala:189)
      at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
      at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
      at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
      at scala.Option.getOrElse(Option.scala:189)
      at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
      at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
      at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
      at scala.Option.getOrElse(Option.scala:189)
      at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
      at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
      at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
      at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
      at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
      at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
      at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
      at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
      at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
      at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
      at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
      at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
      at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
      at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
      at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
      at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
      at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
      at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
      at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
      ... 47 elided
    Caused by: java.lang.IllegalStateException: The input format instance has 
not been properly initialized. Ensure you call initializeTable either in your 
constructor or initialize method
      at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
      at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
      ... 86 more
    ```
    
    There are two interfaces:
    
    - the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg 
method `getSplits(JobContext context)` (returning `List<InputSplit>`)
    - the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg 
method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)
    
    And in Hive both are implemented by `HiveHBaseTableInputFormat` but only 
the old method leads to required initialisation and this why `NewHadoopRDD` 
fails here.
    
    Here all the link refers latest commits of the master branches for the 
mentioned components at the time of writing this description (to get the right 
line numbers in the future too as `master` itself is a moving target).
    
    Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
    
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
    
    Hive on the other hand binds the initialisation to the two args method 
coming from the old interface.
    See 
[Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):
    
    ```
      Override public InputSplit[] getSplits(final JobConf jobConf, final int 
numSplits) throws IOException {
    ```
    
    This calls `getSplitsInternal` which contains the 
[initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299)
 too:
    ```
        initializeTable(conn, tableName);
    ```
    
    Interesting that Hive also uses the one arg method internally within the 
`getSplitsInternal` 
[here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356)
 but the initialisation done earlier.
    
    By calling the new interface method (what `NewHadoopRDD` does) the call 
goes straight to the HBase method: 
[org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).
    
    Where there would be some `JobContext` based 
[initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
    which by default is an [empty 
method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
    ```java
      /**
       * Handle subclass specific set up. Each of the entry points used by the 
MapReduce framework,
       * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link 
#getSplits(JobContext)},
       * will call {link #initialize(JobContext)} as a convenient centralized 
location to handle
       * retrieving the necessary configuration information and calling
       * {link #initializeTable(Connection, TableName)}. Subclasses should 
implement their initialize
       * call such that it is safe to call multiple times. The current 
TableInputFormatBase
       * implementation relies on a non-null table reference to decide if an 
initialize call is needed,
       * but this behavior may change in the future. In particular, it is 
critical that initializeTable
       * not be called multiple times since this will leak Connection instances.
       */
      protected void initialize(JobContext context) throws IOException {
      }
    ```
    
    This is not overridden by Hive and hard to reason why we need that (its an 
internal Hive class) so it is easier to fix this in Spark.
    
    No.
    
    1) create hbase table
    
    ```
     hbase(main):001:0>create 'hbase_test1', 'cf1'
     hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
    ```
    
    2) create hive table related to hbase table
    
    hive>
    ```
    CREATE EXTERNAL TABLE `hivetest.hbase_test`(
      `key` string COMMENT '',
      `value` string COMMENT '')
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.hbase.HBaseSerDe'
    STORED BY
      'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES (
      'hbase.columns.mapping'=':key,cf1:v1',
      'serialization.format'='1')
    TBLPROPERTIES (
      'hbase.table.name'='hbase_test')
    ```
     
    
    3): spark-shell query hive table while data in HBase
    
    ```
    scala> spark.sql("select * from hivetest.hbase_test").show()
    22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo 
does not exist
    22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf 
hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless 
hive logic
    Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
    +---+-----+
    |key|value|
    +---+-----+
    | r1|  123|
    +---+-----+
    ```
    
    Closes #38516 from attilapiros/SPARK-32380.
    
    Authored-by: attilapiros <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit 7009ef0510dae444c72e7513357e681b08379603)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index a04802681e4..4c9791387ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -300,6 +300,16 @@ class HadoopTableReader(
     }
   }
 
+  /**
+   * True if the new org.apache.hadoop.mapreduce.InputFormat is implemented 
(except
+   * HiveHBaseTableInputFormat where although the new interface is implemented 
by base HBase class
+   * the table inicialization in the Hive layer only happens via the old 
interface methods -
+   * for more details see SPARK-32380).
+   */
+  private def compatibleWithNewHadoopRDD(inputClass: Class[_ <: 
oldInputClass[_, _]]): Boolean =
+    classOf[newInputClass[_, _]].isAssignableFrom(inputClass) &&
+      
!inputClass.getName.equalsIgnoreCase("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat")
+
   /**
    * The entry of creating a RDD.
    * [SPARK-26630] Using which HadoopRDD will be decided by the input format 
of tables.
@@ -308,7 +318,7 @@ class HadoopTableReader(
    */
   private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: 
String): RDD[Writable] = {
     val inputFormatClazz = localTableDesc.getInputFileFormatClass
-    if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
+    if (compatibleWithNewHadoopRDD(inputFormatClazz)) {
       createNewHadoopRDD(localTableDesc, inputPathStr)
     } else {
       createOldHadoopRDD(localTableDesc, inputPathStr)
@@ -317,7 +327,7 @@ class HadoopTableReader(
 
   private def createHadoopRDD(partitionDesc: PartitionDesc, inputPathStr: 
String): RDD[Writable] = {
     val inputFormatClazz = partitionDesc.getInputFileFormatClass
-    if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
+    if (compatibleWithNewHadoopRDD(inputFormatClazz)) {
       createNewHadoopRDD(partitionDesc, inputPathStr)
     } else {
       createOldHadoopRDD(partitionDesc, inputPathStr)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to