[
https://issues.apache.org/jira/browse/HBASE-20295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416539#comment-16416539
]
Ted Yu edited comment on HBASE-20295 at 3/28/18 1:07 AM:
---------------------------------------------------------
Have you read the comment I mentioned ?
Why not use ubmit-patch.py ?
was (Author: [email protected]):
Have you read the comment I mentioned ?
Why not use ubmit-patch.sh ?
> TableOutputFormat.checkOutputSpecs throw NullPointerException Exception
> -----------------------------------------------------------------------
>
> Key: HBASE-20295
> URL: https://issues.apache.org/jira/browse/HBASE-20295
> Project: HBase
> Issue Type: Bug
> Components: Client
> Affects Versions: 1.4.0
> Environment: Spark 2.2.1, HBase 1.4.0
> Reporter: Michael Jin
> Priority: Major
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I am using spark write data to HBase by using RDD.
> saveAsNewAPIHadoopDataset function, it works fine with hbase 1.3.1, but when
> update my hbase dependency to 1.4.0 in pom.xml, it throw
> java.lang.NullPointerException, it is caused by a logic error in
> TableOutputFormat.checkOutputSpecs function, please check below details:
> first let's take a look at SparkHadoopMapReduceWriter.write function in
> SparkHadoopMapReduceWriter.scala
> {code:java}
> // SparkHadoopMapReduceWriter.write
> (org.apache.spark.internal.io.SparkHadoopMapReduceWriter.scala)
> def write[K, V: ClassTag](
> rdd: RDD[(K, V)],
> hadoopConf: Configuration): Unit = {
> // Extract context and configuration from RDD.
> val sparkContext = rdd.context
> val stageId = rdd.id
> val sparkConf = rdd.conf
> val conf = new SerializableConfiguration(hadoopConf)
> // Set up a job.
> val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
> val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP,
> 0, 0)
> val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
> val format = jobContext.getOutputFormatClass
> if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) {
> // FileOutputFormat ignores the filesystem parameter
> val jobFormat = format.newInstance
> jobFormat.checkOutputSpecs(jobContext)
> }
> val committer = FileCommitProtocol.instantiate(
> className = classOf[HadoopMapReduceCommitProtocol].getName,
> jobId = stageId.toString,
> outputPath =
> conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
> isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> ...{code}
> in "write" function if output spec validation is enabled, it will call
> checkOutputSpec function in TableOutputFormat class, but the job format is
> simply created by "vall jobFormat = format.newInstance", this will NOT
> initialize "conf" member variable in TableOutputFormat class, let's continue
> check checkOutputSpecs function in TableOutputFormat class
>
> {code:java}
> // TableOutputFormat.checkOutputSpecs
> (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.java) HBASE 1.4.0
> @Override
> public void checkOutputSpecs(JobContext context) throws IOException,
> InterruptedException {
> try (Admin admin =
> ConnectionFactory.createConnection(getConf()).getAdmin()) {
> TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
> if (!admin.tableExists(tableName)) {
> throw new TableNotFoundException("Can't write, table does not exist:" +
> tableName.getNameAsString());
> }
> if (!admin.isTableEnabled(tableName)) {
> throw new TableNotEnabledException("Can't write, table is not enabled:
> " +
> tableName.getNameAsString());
> }
> }
> }
> {code}
>
> "ConnectionFactory.createConnection(getConf())", as mentioned above "conf"
> class member is not initialized, so getConf() will return null, so in the
> next UserProvider create instance process, it throw the
> NullPointException(Please part of stack trace at the end), it is a little
> confused that, context passed by function parameter is actually been properly
> constructed, and it contains Configuration object, why context is never used?
> So I suggest to use below code to partly fix this issue:
>
> {code:java}
> // code placeholder
> @Override
> public void checkOutputSpecs(JobContext context) throws IOException,
> InterruptedException {
> Configuration hConf = context.getConfiguration();
> if(hConf == null)
> hConf = this.conf;
> try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) {
> TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
> if (!admin.tableExists(tableName)) {
> throw new TableNotFoundException("Can't write, table does not exist:" +
> tableName.getNameAsString());
> }
> if (!admin.isTableEnabled(tableName)) {
> throw new TableNotEnabledException("Can't write, table is not enabled:
> " +
> tableName.getNameAsString());
> }
> }
> }
> {code}
> In hbase 1.3.1, this issue is not exists because checkOutputSpecs has a blank
> function body
>
>
> Part of stack trace:
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:122)
> at
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
> at
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
> at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)
> at
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:76)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)