[
https://issues.apache.org/jira/browse/HBASE-20295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461602#comment-16461602
]
Mike Drob commented on HBASE-20295:
-----------------------------------
[~tedyu], [~menjin], [~elserj], [~stack] - Question about this implementation,
sorry for showing up a month late to the issue...
The patch adds:
{noformat}
Configuration hConf = context.getConfiguration();
if(hConf == null) {
hConf = this.conf;
}
{noformat}
Are we doing this backwards? If the concern is that sometimes getConf() returns
null, shouldn't we be using that first and checking it's output instead of
pulling the conf from the configuration?
I'm having a pig script that fails against hbase-2.0 with this patch included,
if I revert the patch then my pig works again.
> TableOutputFormat.checkOutputSpecs throw NullPointerException Exception
> -----------------------------------------------------------------------
>
> Key: HBASE-20295
> URL: https://issues.apache.org/jira/browse/HBASE-20295
> Project: HBase
> Issue Type: Bug
> Components: mapreduce
> Affects Versions: 1.4.0
> Environment: Spark 2.2.1, HBase 1.4.0
> Reporter: Michael Jin
> Assignee: Michael Jin
> Priority: Major
> Fix For: 2.0.0
>
> Attachments: HBASE-20295.branch-1.4.001.patch,
> HBASE-20295.master.001.patch, HBASE-20295.master.002.patch,
> HBASE-20295.master.003.patch
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I am using spark write data to HBase by using RDD.
> saveAsNewAPIHadoopDataset function, it works fine with hbase 1.3.1, but when
> update my hbase dependency to 1.4.0 in pom.xml, it throw
> java.lang.NullPointerException, it is caused by a logic error in
> TableOutputFormat.checkOutputSpecs function, please check below details:
> first let's take a look at SparkHadoopMapReduceWriter.write function in
> SparkHadoopMapReduceWriter.scala
> {code:java}
> // SparkHadoopMapReduceWriter.write
> (org.apache.spark.internal.io.SparkHadoopMapReduceWriter.scala)
> def write[K, V: ClassTag](
> rdd: RDD[(K, V)],
> hadoopConf: Configuration): Unit = {
> // Extract context and configuration from RDD.
> val sparkContext = rdd.context
> val stageId = rdd.id
> val sparkConf = rdd.conf
> val conf = new SerializableConfiguration(hadoopConf)
> // Set up a job.
> val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
> val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP,
> 0, 0)
> val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
> val format = jobContext.getOutputFormatClass
> if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(sparkConf)) {
> // FileOutputFormat ignores the filesystem parameter
> val jobFormat = format.newInstance
> jobFormat.checkOutputSpecs(jobContext)
> }
> val committer = FileCommitProtocol.instantiate(
> className = classOf[HadoopMapReduceCommitProtocol].getName,
> jobId = stageId.toString,
> outputPath =
> conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
> isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> ...{code}
> in "write" function if output spec validation is enabled, it will call
> checkOutputSpec function in TableOutputFormat class, but the job format is
> simply created by "vall jobFormat = format.newInstance", this will NOT
> initialize "conf" member variable in TableOutputFormat class, let's continue
> check checkOutputSpecs function in TableOutputFormat class
>
> {code:java}
> // TableOutputFormat.checkOutputSpecs
> (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.java) HBASE 1.4.0
> @Override
> public void checkOutputSpecs(JobContext context) throws IOException,
> InterruptedException {
> try (Admin admin =
> ConnectionFactory.createConnection(getConf()).getAdmin()) {
> TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
> if (!admin.tableExists(tableName)) {
> throw new TableNotFoundException("Can't write, table does not exist:" +
> tableName.getNameAsString());
> }
> if (!admin.isTableEnabled(tableName)) {
> throw new TableNotEnabledException("Can't write, table is not enabled:
> " +
> tableName.getNameAsString());
> }
> }
> }
> {code}
>
> "ConnectionFactory.createConnection(getConf())", as mentioned above "conf"
> class member is not initialized, so getConf() will return null, so in the
> next UserProvider create instance process, it throw the
> NullPointException(Please part of stack trace at the end), it is a little
> confused that, context passed by function parameter is actually been properly
> constructed, and it contains Configuration object, why context is never used?
> So I suggest to use below code to partly fix this issue:
>
> {code:java}
> // code placeholder
> @Override
> public void checkOutputSpecs(JobContext context) throws IOException,
> InterruptedException {
> Configuration hConf = context.getConfiguration();
> if(hConf == null)
> hConf = this.conf;
> try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) {
> TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
> if (!admin.tableExists(tableName)) {
> throw new TableNotFoundException("Can't write, table does not exist:" +
> tableName.getNameAsString());
> }
> if (!admin.isTableEnabled(tableName)) {
> throw new TableNotEnabledException("Can't write, table is not enabled:
> " +
> tableName.getNameAsString());
> }
> }
> }
> {code}
> In hbase 1.3.1, this issue is not exists because checkOutputSpecs has a blank
> function body
>
>
> Part of stack trace:
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:122)
> at
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
> at
> org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
> at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)
> at
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:76)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)