[
https://issues.apache.org/jira/browse/PIG-4611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14611792#comment-14611792
]
liyunzhang_intel commented on PIG-4611:
---------------------------------------
[~mohitsabharwal]: with my workaround , HBaseStorage will not always use the
default caster(ie. Utf8StorageConverter).
when you debug the code as following steps, you will find the following
stacktrace.
1. append conf/pig.properties:
pig.hbase.caster=HBaseBinaryConverter
2. debug the code:
in thread 'Spark executor', it first serializes all object. At first time,
[{{defaultCaster}}|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java#L304]
is default value:Utf8StorageConverter because
UDFContext.getUDFContext().getClientSystemProps() is null.
{code}
at
org.apache.pig.backend.hadoop.hbase.HBaseStorage.<init>(HBaseStorage.java:307)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:746)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast.instantiateFunc(POCast.java:86)
at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast.readObject(POCast.java:1993)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
{code}
after that, UDFContext#deserialize will be called in following
stacktrace(PigInputFormatSpark.createRecordReader->PigInputFormat.createRecordReader->MapRedUtil.setupUDFContext->UDFContext.deserialize)
and UDFContext will be initialized at that point
{code}
at
org.apache.pig.impl.util.UDFContext.deserialize(UDFContext.java:213)
at
org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil.setupUDFContext(MapRedUtil.java:176)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat.createRecordReader(PigInputFormat.java:96)
at
org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark.createRecordReader(PigInputFormatSpark.java:41)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:131)
at
org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
at
org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
{code}
later, when
PigInputFormat.createRecordReader->PigInputFormat.getLoadFunc->PigContext.instantiateFuncFromSpec->HBaseStorage.<init>
is called.At that point,
[{{defaultCaster}}|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java#L304]
is HBaseBinaryConverter (this value is what you set in pig.properties).
{code}
at
org.apache.pig.backend.hadoop.hbase.HBaseStorage.<init>(HBaseStorage.java:305)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:746)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat.getLoadFunc(PigInputFormat.java:149)
at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat.createRecordReader(PigInputFormat.java:97)
at
org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark.createRecordReader(PigInputFormatSpark.java:41)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:131)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
{code}
When you debug pig-hbase script, you need deploy hbase environment. It seems
that it is hard to debug TestHBaseStorage code in unit test environment (ant
-Dtestcase=$unittest -Dexectype=TestHBaseStorage -DdebugPort=9999
-Dhadoopversion=23 test) and i don't know why.
pig-hbase.pig
{code}
a = load 'hbase://pigtable_1' using
org.apache.pig.backend.hadoop.hbase.HBaseStorage('pig:col_a pig:col_b
pig:col_c','-loadKey') as (rowKey:chararray,col_a:int, col_b:double,
col_c:chararray);
b = FOREACH a GENERATE col_a, col_c;
store b into './testLoadWithProjection_2.out';
{code}
If you don't deploy the hbase enviroment , you can add log as following to
verify:
org.apache.pig.backend.hadoop.hbase.HBaseStorage#HBaseStorage(java.lang.String,
java.lang.String)
{code}
public HBaseStorage(String columnList, String optString) throws ParseException,
IOException {
....
String defaultCaster =
UDFContext.getUDFContext().getClientSystemProps()!=null?
UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY,
STRING_CASTER):STRING_CASTER;
+ LOG.debug("thread:"+Thread.currentThread().getName()+"
UDFContext.getUDFContext().getClientSystemProps().getProperty(\"pig.hbase.caster\"):"+UDFContext.getUDFContext().getClientSystemProps().getProperty("pig.hbase.caster"));
...
}
{code}
> Fix remaining unit test failures about "TestHBaseStorage"
> ---------------------------------------------------------
>
> Key: PIG-4611
> URL: https://issues.apache.org/jira/browse/PIG-4611
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4611.patch
>
>
> In https://builds.apache.org/job/Pig-spark/lastCompletedBuild/testReport/, it
> shows following unit test failures about TestHBaseStorage:
> org.apache.pig.test.TestHBaseStorage.testStoreToHBase_1_with_delete
> org.apache.pig.test.TestHBaseStorage.testLoadWithProjection_1
> org.apache.pig.test.TestHBaseStorage.testLoadWithProjection_2
> org.apache.pig.test.TestHBaseStorage.testStoreToHBase_2_with_projection
> org.apache.pig.test.TestHBaseStorage.testCollectedGroup
> org.apache.pig.test.TestHBaseStorage.testHeterogeneousScans
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)