[ 
https://issues.apache.org/jira/browse/HBASE-20295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Jin updated HBASE-20295:
--------------------------------
    Attachment: HBASE-20295.master.002.patch

> TableOutputFormat.checkOutputSpecs throw NullPointerException Exception
> -----------------------------------------------------------------------
>
>                 Key: HBASE-20295
>                 URL: https://issues.apache.org/jira/browse/HBASE-20295
>             Project: HBase
>          Issue Type: Bug
>          Components: mapreduce
>    Affects Versions: 1.4.0
>         Environment: Spark 2.2.1, HBase 1.4.0
>            Reporter: Michael Jin
>            Assignee: Michael Jin
>            Priority: Major
>         Attachments: HBASE-20295.branch-1.4.001.patch, 
> HBASE-20295.master.001.patch, HBASE-20295.master.002.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I am using spark write data to HBase by using RDD.
> saveAsNewAPIHadoopDataset function, it works fine with hbase 1.3.1, but when 
> update my hbase dependency to 1.4.0 in pom.xml, it throw 
> java.lang.NullPointerException, it is caused by a logic error in 
> TableOutputFormat.checkOutputSpecs function, please check below details:
> first let's take a look at SparkHadoopMapReduceWriter.write function in 
> SparkHadoopMapReduceWriter.scala
> {code:java}
> // SparkHadoopMapReduceWriter.write 
> (org.apache.spark.internal.io.SparkHadoopMapReduceWriter.scala)
> def write[K, V: ClassTag](
>     rdd: RDD[(K, V)],
>     hadoopConf: Configuration): Unit = {
>   // Extract context and configuration from RDD.
>   val sparkContext = rdd.context
>   val stageId = rdd.id
>   val sparkConf = rdd.conf
>   val conf = new SerializableConfiguration(hadoopConf)
>   // Set up a job.
>   val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
>   val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 
> 0, 0)
>   val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
>   val format = jobContext.getOutputFormatClass
>   if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) {
>     // FileOutputFormat ignores the filesystem parameter
>     val jobFormat = format.newInstance
>     jobFormat.checkOutputSpecs(jobContext)
>   }
>   val committer = FileCommitProtocol.instantiate(
>     className = classOf[HadoopMapReduceCommitProtocol].getName,
>     jobId = stageId.toString,
>     outputPath = 
> conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>     isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
>   committer.setupJob(jobContext)
> ...{code}
> in "write" function if output spec validation is enabled, it will call 
> checkOutputSpec function in TableOutputFormat class, but the job format is 
> simply created by "vall jobFormat = format.newInstance", this will NOT 
> initialize "conf" member variable in TableOutputFormat class, let's continue 
> check checkOutputSpecs function in TableOutputFormat class
>  
> {code:java}
> // TableOutputFormat.checkOutputSpecs 
> (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.java) HBASE 1.4.0
> @Override
> public void checkOutputSpecs(JobContext context) throws IOException,
>     InterruptedException {
>   try (Admin admin = 
> ConnectionFactory.createConnection(getConf()).getAdmin()) {
>     TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
>     if (!admin.tableExists(tableName)) {
>       throw new TableNotFoundException("Can't write, table does not exist:" +
>           tableName.getNameAsString());
>     }
>     if (!admin.isTableEnabled(tableName)) {
>       throw new TableNotEnabledException("Can't write, table is not enabled: 
> " +
>           tableName.getNameAsString());
>     }
>   }
> }
> {code}
>  
> "ConnectionFactory.createConnection(getConf())", as mentioned above "conf" 
> class member is not initialized, so getConf() will return null, so in the 
> next UserProvider create instance process, it throw the 
> NullPointException(Please part of stack trace at the end), it is a little 
> confused that, context passed by function parameter is actually been properly 
> constructed, and it contains Configuration object, why context is never used? 
> So I suggest to use below code to partly fix this issue:
>  
> {code:java}
> // code placeholder
> @Override
> public void checkOutputSpecs(JobContext context) throws IOException,
>     InterruptedException {
>   Configuration hConf = context.getConfiguration();
>   if(hConf == null)
>     hConf = this.conf;
>   try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) {
>     TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
>     if (!admin.tableExists(tableName)) {
>       throw new TableNotFoundException("Can't write, table does not exist:" +
>               tableName.getNameAsString());
>     }
>     if (!admin.isTableEnabled(tableName)) {
>       throw new TableNotEnabledException("Can't write, table is not enabled: 
> " +
>               tableName.getNameAsString());
>     }
>   }
> }
> {code}
> In hbase 1.3.1, this issue is not exists because checkOutputSpecs has a blank 
> function body
>  
>  
> Part of stack trace:
> Exception in thread "main" java.lang.NullPointerException
>  at 
> org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:122)
>  at 
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
>  at 
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
>  at 
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)
>  at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:76)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>  at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to