[
https://issues.apache.org/jira/browse/HIVE-21191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
one updated HIVE-21191:
-----------------------
Description:
i want a distinctLag functions ,The function is like lag, but the difference is
to select different values in front of it.
Example:
{color:#14892c}select * from active{color}
||session||sq||channel||
|1|1|A|
|1|2|B|
|1|3|B|
|1|4|C|
|1|5|B|
|2|1|C|
|2|2|B|
|2|3|B|
|2|4|A|
|2|5|B|
{color:#14892c}
select session,sq,lag(channel)over(partition by session order by sq) from
active{color}
||session||sq||channel||
|1|1|null|
|1|2|A|
|1|3|B|
|1|4|B|
|1|5|C|
|2|1|null|
|2|2|C|
|2|3|B|
|2|4|B|
|2|5|A|
The function I want is:{color:#14892c}
select session,sq,distinctLag(channel)over(partition by session order by sq)
from active{color}
||session||sq||channel||
|1|1|null|
|1|2|A|
|1|3|A|
|1|4|B|
|1|5|C|
|2|1|null|
|2|2|C|
|2|3|C|
|2|4|B|
|2|5|A|
i try to extend GenericUDFLeadLag and Override:
{code:java}
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@Description(
name = "distinctLag",
value = "distinctLag (scalar_expression [,offset] [,default]) OVER
([query_partition_clause] order_by_clause); "
+ "The distinctLag function is used to access data from a
distinct previous row.",
extended = "Example:\n "
+ "select p1.p_mfgr, p1.p_name, p1.p_size,\n"
+ " p1.p_size - distinctLag(p1.p_size,1,p1.p_size) over( distribute
by p1.p_mfgr sort by p1.p_name) as deltaSz\n"
+ " from part p1 join part p2 on p1.p_partkey = p2.p_partkey")
@UDFType(impliesOrder = true)
public class GenericUDFDistinctLag extends GenericUDFLeadLag {
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException
{
Object defaultVal = null;
if (arguments.length == 3) {
defaultVal =
ObjectInspectorUtils.copyToStandardObject(getDefaultValueConverter().convert(arguments[2].get()),
getDefaultArgOI());
}
int idx = getpItr().getIndex() - 1;
int start = 0;
int end = getpItr().getPartition().size();
try {
Object currValue =
ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().resetToIndex(idx)),
getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
Object ret = null;
int newIdx = idx;
do {
--newIdx;
if (newIdx >= end || newIdx < start) {
ret = defaultVal;
return ret;
}else{
ret =
ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().lag(1)),
getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
if(ret.equals(currValue)){
setAmt(getAmt() - 1);
}
}
} while (getAmt() > 0);
return ret;
} finally {
Object currRow = getpItr().resetToIndex(idx);
// reevaluate expression on current Row, to trigger the
Lazy object
// caches to be reset to the current row.
getExprEvaluator().evaluate(currRow);
}
}
@Override
protected String _getFnName(){
return "distinctLag";
}
@Override
protected Object getRow(int amt) throws HiveException {
throw new HiveException("distinctLag error: cannot call
getRow");
}
@Override
protected int getIndex(int amt) {
// TODO Auto-generated method stub
return 0;
}
}{code}
and package as a jar,add into hive,create a temporary function.
then,i run:
{color:#14892c}select session,sq,distinctLag(channel)over(partition by session
order by sq) from active;{color}
{color:#333333}It reported an error:{color}
{color:#d04437}FAILED: SemanticException Failed to breakup Windowing
invocations into Groups. At least 1 group must only depend on input columns.
Also check for circular dependencies.
Underlying error: Invalid function distinctLag{color}
{color:#333333}I don't know exactly what the problem is. I hope someone can
give me a hint. Thank you.{color}
{color:#333333}then,I noticed that there have a UDAF function GenericUDAFLag.I
tried to imitate it.{color}
{code:java}
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;
@WindowFunctionDescription(description = @Description(name = "lag", value =
"_FUNC_(expr, amt, default)"), supportsWindow = false, pivotResult = true,
impliesOrder = true)
public class GenericUDAFDistinctLag extends GenericUDAFDistinctLeadLag {
static final Log LOG =
LogFactory.getLog(GenericUDAFDistinctLag.class.getName());
@Override
protected String functionName() {
return "Lag";
}
@Override
protected GenericUDAFDistinctLeadLagEvaluator createLLEvaluator() {
return new GenericUDAFDistinctLagEvaluator();
}
public static class GenericUDAFDistinctLagEvaluator extends
GenericUDAFDistinctLeadLagEvaluator {
public GenericUDAFDistinctLagEvaluator() {
}
/*
* used to initialize Streaming Evaluator.
*/
protected
GenericUDAFDistinctLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
super(src);
}
@Override
protected DistinctLeadLagBuffer getNewLLBuffer() throws
HiveException {
return new DistinctLagBuffer();
}
@Override
public GenericUDAFEvaluator
getWindowingEvaluator(WindowFrameDef wFrmDef) {
return new
GenericUDAFDistinctLagEvaluatorStreaming(this);
}
}
static class DistinctLagBuffer implements DistinctLeadLagBuffer {
ArrayList<Object> values;
int lagAmt;
ArrayList<Object> lagValues;
@Override
public void initialize(int lagAmt) {
this.lagAmt = lagAmt;
lagValues = new ArrayList<Object>();
values = new ArrayList<Object>();
}
@Override
public void addRow(Object currValue, Object defaultValue) {
int i = values.size() - 1;
int noEquals = 0;
for (; i >= 0; i--) {
if (!currValue.equals(values.get(i))) {
if (++noEquals == lagAmt) {
break;
}
}
}
lagValues.add(i == -1 ? defaultValue : values.get(i));
values.add(currValue);
}
@Override
public Object terminate() {
return lagValues;
}
}
/*
* StreamingEval: wrap regular eval. on getNext remove first row from
values
* and return it.
*/
static class GenericUDAFDistinctLagEvaluatorStreaming extends
GenericUDAFDistinctLagEvaluator implements ISupportStreamingModeForWindowing {
protected
GenericUDAFDistinctLagEvaluatorStreaming(GenericUDAFDistinctLeadLagEvaluator
src) {
super(src);
}
@Override
public Object getNextResult(AggregationBuffer agg) throws
HiveException {
DistinctLagBuffer lb = (DistinctLagBuffer) agg;
if (!lb.lagValues.isEmpty()) {
Object res = lb.lagValues.remove(0);
if (res == null) {
return
ISupportStreamingModeForWindowing.NULL_RESULT;
}
return res;
} else if (!lb.values.isEmpty()) {
Object res = lb.values.remove(0);
if (res == null) {
return
ISupportStreamingModeForWindowing.NULL_RESULT;
}
return res;
}
return null;
}
@Override
public int getRowsRemainingAfterTerminate() throws
HiveException {
return getAmt();
}
}
}
{code}
{code:java}
import java.lang.reflect.Field;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.io.IntWritable;
public abstract class GenericUDAFDistinctLeadLag extends
AbstractGenericUDAFResolver {
static final Log LOG =
LogFactory.getLog(GenericUDAFLead.class.getName());
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo
parameters) throws SemanticException {
ObjectInspector[] paramOIs =
parameters.getParameterObjectInspectors();
String fNm = functionName();
if (!(paramOIs.length >= 1 && paramOIs.length <= 3)) {
throw new UDFArgumentTypeException(paramOIs.length - 1,
"Incorrect invocation of " + fNm + ": _FUNC_(expr, amt, default)");
}
int amt = 1;
if (paramOIs.length > 1) {
ObjectInspector amtOI = paramOIs[1];
if
(!ObjectInspectorUtils.isConstantObjectInspector(amtOI) || (amtOI.getCategory()
!= ObjectInspector.Category.PRIMITIVE)
|| ((PrimitiveObjectInspector)
amtOI).getPrimitiveCategory() !=
PrimitiveObjectInspector.PrimitiveCategory.INT) {
throw new UDFArgumentTypeException(1, fNm + "
amount must be a integer value " + amtOI.getTypeName() + " was passed as
parameter 1.");
}
Object o = ((ConstantObjectInspector)
amtOI).getWritableConstantValue();
amt = ((IntWritable) o).get();
if (amt < 0) {
throw new UDFArgumentTypeException(1, fNm + "
amount can not be nagative. Specified: " + amt);
}
}
if (paramOIs.length == 3) {
ObjectInspectorConverters.getConverter(paramOIs[2],
paramOIs[0]);
}
GenericUDAFDistinctLeadLagEvaluator eval = createLLEvaluator();
eval.setAmt(amt);
return eval;
}
protected abstract String functionName();
protected abstract GenericUDAFDistinctLeadLagEvaluator
createLLEvaluator();
public static abstract class GenericUDAFDistinctLeadLagEvaluator
extends GenericUDAFEvaluator {
private transient ObjectInspector[] inputOI;
private int amt;
String fnName;
private transient Converter defaultValueConverter;
public GenericUDAFDistinctLeadLagEvaluator() {
}
/*
* used to initialize Streaming Evaluator.
*/
protected
GenericUDAFDistinctLeadLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
this.inputOI = src.inputOI;
this.amt = src.amt;
this.fnName = src.fnName;
this.defaultValueConverter = src.defaultValueConverter;
try {
Field mode =
GenericUDAFEvaluator.class.getDeclaredField("mode");
mode.setAccessible(true);
mode.set(this, mode.get(src));
mode.setAccessible(false);
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NoSuchFieldException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[]
parameters) throws HiveException {
super.init(m, parameters);
if (m != Mode.COMPLETE) {
throw new HiveException("Only COMPLETE mode
supported for " + fnName + " function");
}
inputOI = parameters;
if (parameters.length == 3) {
defaultValueConverter =
ObjectInspectorConverters.getConverter(parameters[2], parameters[0]);
}
return
ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(parameters[0]));
}
public int getAmt() {
return amt;
}
public void setAmt(int amt) {
this.amt = amt;
}
public String getFnName() {
return fnName;
}
public void setFnName(String fnName) {
this.fnName = fnName;
}
protected abstract DistinctLeadLagBuffer getNewLLBuffer()
throws HiveException;
@Override
public AggregationBuffer getNewAggregationBuffer() throws
HiveException {
DistinctLeadLagBuffer lb = getNewLLBuffer();
lb.initialize(amt);
return lb;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((DistinctLeadLagBuffer) agg).initialize(amt);
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
Object rowExprVal =
ObjectInspectorUtils.copyToStandardObject(parameters[0], inputOI[0]);
Object defaultVal = parameters.length > 2 ?
ObjectInspectorUtils.copyToStandardObject(defaultValueConverter.convert(parameters[2]),
inputOI[0]) : null;
((DistinctLeadLagBuffer) agg).addRow(rowExprVal,
defaultVal);
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws
HiveException {
throw new HiveException("terminatePartial not
supported");
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws
HiveException {
throw new HiveException("merge not supported");
}
@Override
public Object terminate(AggregationBuffer agg) throws
HiveException {
return ((DistinctLeadLagBuffer) agg).terminate();
}
}
}
{code}
{code:java}
import
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
@SuppressWarnings("deprecation")
interface DistinctLeadLagBuffer extends AggregationBuffer {
void initialize(int leadAmt);
void addRow(Object leadExprValue, Object defaultValue);
Object terminate();
}
{code}
and package as a jar,add into hive,create a temporary function.in hige hive
vesion,it works, but in{color:#14892c} hive1.1.0{color} version ,It reported an
error {color:#333333}(and in hive1.1.0 if i create that temporary function
named lag or lead,it also works as what i want ,but it will cover hive's
built-in function lag/lead even if deleted that temporary function,Only when I
quit hive-cli and reenter hive-cli , built-in function lag/lead can
work){color}:
{code:java}
hive> SELECT session,sq,distinctLag(channel)over(PARTITION BY session ORDER BY
sq) FROM elephant_active;
Query ID = root_20190131195959_8047b4ba-a85c-4f39-8a27-989388316c50
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1546504603780_0492, Tracking URL =
http://dce.bi.com:8088/proxy/application_1546504603780_0492/
Kill Command =
/opt/cloudera/parcels/CDH-5.10.2-1.cdh5.10.2.p0.5/lib/hadoop/bin/hadoop job
-kill job_1546504603780_0492
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-01-31 20:00:03,639 Stage-1 map = 0%, reduce = 0%
2019-01-31 20:00:09,972 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.38
sec
2019-01-31 20:00:30,795 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 1.38
sec
MapReduce Total cumulative CPU time: 1 seconds 380 msec
Ended Job = job_1546504603780_0492 with errors
Error during job, obtaining debugging information...
Examining task ID: task_1546504603780_0492_m_000000 (and more) from job
job_1546504603780_0492
Task with the most failures(4):
-----
Task ID:
task_1546504603780_0492_r_000000
URL:
http://0.0.0.0:8088/taskdetails.jsp?jobid=job_1546504603780_0492&tipid=task_1546504603780_0492_r_000000
-----
Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: Error in configuring object
at
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at
org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:409)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
... 9 more
Caused by: java.lang.RuntimeException: Reduce operator initialization failed
at
org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:166)
... 14 more
Caused by: java.lang.NullPointerException
at
org.apache.hadoop.hive.ql.exec.Registry.getFunctionInfo(Registry.java:306)
at
org.apache.hadoop.hive.ql.exec.Registry.getWindowFunctionInfo(Registry.java:314)
at
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getWindowFunctionInfo(FunctionRegistry.java:504)
at
org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.streamingPossible(WindowingTableFunction.java:151)
at
org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.setCanAcceptInputAsStream(WindowingTableFunction.java:222)
at
org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.initializeStreaming(WindowingTableFunction.java:256)
at
org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.initializeStreaming(PTFOperator.java:291)
at
org.apache.hadoop.hive.ql.exec.PTFOperator.initializeOp(PTFOperator.java:86)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
at
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
at
org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
at
org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:159)
... 14 more
FAILED: Execution Error, return code 2 from
org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 1.38 sec HDFS Read: 2958
HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 1 seconds 380 msec
hive>
{code}
I guess it's FunctionRegistry problem.
I am a beginner. I hope someone can tell me the correct way to realize this
special function. Thank you very much. I use Hive 1.1.0 + cdh5.10.2 + 945.
{code:java}
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.1.0-cdh5.10.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
<scope>test</scope>
</dependency>
</dependencies>
{code}
was:
i want a distinctLag functions ,The function is like lag, but the difference is
to select different values in front of it.
Example:
{color:#14892c}select * from active{color}
||session||sq||channel||
|1|1|A|
|1|2|B|
|1|3|B|
|1|4|C|
|1|5|B|
|2|1|C|
|2|2|B|
|2|3|B|
|2|4|A|
|2|5|B|
{color:#14892c}
select session,sq,lag(channel)over(partition by session order by sq) from
active{color}
||session||sq||channel||
|1|1|null|
|1|2|A|
|1|3|B|
|1|4|B|
|1|5|C|
|2|1|null|
|2|2|C|
|2|3|B|
|2|4|B|
|2|5|A|
The function I want is:{color:#14892c}
select session,sq,distinctLag(channel)over(partition by session order by sq)
from active{color}
||session||sq||channel||
|1|1|null|
|1|2|A|
|1|3|A|
|1|4|B|
|1|5|C|
|2|1|null|
|2|2|C|
|2|3|C|
|2|4|B|
|2|5|A|
i try to extend GenericUDFLeadLag and Override:
{code:java}
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@Description(
name = "distinctLag",
value = "distinctLag (scalar_expression [,offset] [,default]) OVER
([query_partition_clause] order_by_clause); "
+ "The distinctLag function is used to access data from a
distinct previous row.",
extended = "Example:\n "
+ "select p1.p_mfgr, p1.p_name, p1.p_size,\n"
+ " p1.p_size - distinctLag(p1.p_size,1,p1.p_size) over( distribute
by p1.p_mfgr sort by p1.p_name) as deltaSz\n"
+ " from part p1 join part p2 on p1.p_partkey = p2.p_partkey")
@UDFType(impliesOrder = true)
public class GenericUDFDistinctLag extends GenericUDFLeadLag {
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException
{
Object defaultVal = null;
if (arguments.length == 3) {
defaultVal =
ObjectInspectorUtils.copyToStandardObject(getDefaultValueConverter().convert(arguments[2].get()),
getDefaultArgOI());
}
int idx = getpItr().getIndex() - 1;
int start = 0;
int end = getpItr().getPartition().size();
try {
Object currValue =
ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().resetToIndex(idx)),
getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
Object ret = null;
int newIdx = idx;
do {
--newIdx;
if (newIdx >= end || newIdx < start) {
ret = defaultVal;
return ret;
}else{
ret =
ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().lag(1)),
getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
if(ret.equals(currValue)){
setAmt(getAmt() - 1);
}
}
} while (getAmt() > 0);
return ret;
} finally {
Object currRow = getpItr().resetToIndex(idx);
// reevaluate expression on current Row, to trigger the
Lazy object
// caches to be reset to the current row.
getExprEvaluator().evaluate(currRow);
}
}
@Override
protected String _getFnName(){
return "distinctLag";
}
@Override
protected Object getRow(int amt) throws HiveException {
throw new HiveException("distinctLag error: cannot call
getRow");
}
@Override
protected int getIndex(int amt) {
// TODO Auto-generated method stub
return 0;
}
}{code}
and package as a jar,add into hive,create a temporary function.
then,i run:
{color:#14892c}select session,sq,distinctLag(channel)over(partition by session
order by sq) from active;{color}
{color:#333333}It reported an error:{color}
{color:#d04437}FAILED: SemanticException Failed to breakup Windowing
invocations into Groups. At least 1 group must only depend on input columns.
Also check for circular dependencies.
Underlying error: Invalid function distinctLag{color}
{color:#333333}I don't know exactly what the problem is. I hope someone can
give me a hint. Thank you.{color}
{color:#333333}then,I noticed that there have a UDAF function GenericUDAFLag.I
tried to imitate it.{color}
{code:java}
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;
@WindowFunctionDescription(description = @Description(name = "lag", value =
"_FUNC_(expr, amt, default)"), supportsWindow = false, pivotResult = true,
impliesOrder = true)
public class GenericUDAFDistinctLag extends GenericUDAFDistinctLeadLag {
static final Log LOG =
LogFactory.getLog(GenericUDAFDistinctLag.class.getName());
@Override
protected String functionName() {
return "Lag";
}
@Override
protected GenericUDAFDistinctLeadLagEvaluator createLLEvaluator() {
return new GenericUDAFDistinctLagEvaluator();
}
public static class GenericUDAFDistinctLagEvaluator extends
GenericUDAFDistinctLeadLagEvaluator {
public GenericUDAFDistinctLagEvaluator() {
}
/*
* used to initialize Streaming Evaluator.
*/
protected
GenericUDAFDistinctLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
super(src);
}
@Override
protected DistinctLeadLagBuffer getNewLLBuffer() throws
HiveException {
return new DistinctLagBuffer();
}
@Override
public GenericUDAFEvaluator
getWindowingEvaluator(WindowFrameDef wFrmDef) {
return new
GenericUDAFDistinctLagEvaluatorStreaming(this);
}
}
static class DistinctLagBuffer implements DistinctLeadLagBuffer {
ArrayList<Object> values;
int lagAmt;
ArrayList<Object> lagValues;
@Override
public void initialize(int lagAmt) {
this.lagAmt = lagAmt;
lagValues = new ArrayList<Object>();
values = new ArrayList<Object>();
}
@Override
public void addRow(Object currValue, Object defaultValue) {
int i = values.size() - 1;
int noEquals = 0;
for (; i >= 0; i--) {
if (!currValue.equals(values.get(i))) {
if (++noEquals == lagAmt) {
break;
}
}
}
lagValues.add(i == -1 ? defaultValue : values.get(i));
values.add(currValue);
}
@Override
public Object terminate() {
return lagValues;
}
}
/*
* StreamingEval: wrap regular eval. on getNext remove first row from
values
* and return it.
*/
static class GenericUDAFDistinctLagEvaluatorStreaming extends
GenericUDAFDistinctLagEvaluator implements ISupportStreamingModeForWindowing {
protected
GenericUDAFDistinctLagEvaluatorStreaming(GenericUDAFDistinctLeadLagEvaluator
src) {
super(src);
}
@Override
public Object getNextResult(AggregationBuffer agg) throws
HiveException {
DistinctLagBuffer lb = (DistinctLagBuffer) agg;
if (!lb.lagValues.isEmpty()) {
Object res = lb.lagValues.remove(0);
if (res == null) {
return
ISupportStreamingModeForWindowing.NULL_RESULT;
}
return res;
} else if (!lb.values.isEmpty()) {
Object res = lb.values.remove(0);
if (res == null) {
return
ISupportStreamingModeForWindowing.NULL_RESULT;
}
return res;
}
return null;
}
@Override
public int getRowsRemainingAfterTerminate() throws
HiveException {
return getAmt();
}
}
}
{code}
{code:java}
import java.lang.reflect.Field;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.io.IntWritable;
public abstract class GenericUDAFDistinctLeadLag extends
AbstractGenericUDAFResolver {
static final Log LOG =
LogFactory.getLog(GenericUDAFLead.class.getName());
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo
parameters) throws SemanticException {
ObjectInspector[] paramOIs =
parameters.getParameterObjectInspectors();
String fNm = functionName();
if (!(paramOIs.length >= 1 && paramOIs.length <= 3)) {
throw new UDFArgumentTypeException(paramOIs.length - 1,
"Incorrect invocation of " + fNm + ": _FUNC_(expr, amt, default)");
}
int amt = 1;
if (paramOIs.length > 1) {
ObjectInspector amtOI = paramOIs[1];
if
(!ObjectInspectorUtils.isConstantObjectInspector(amtOI) || (amtOI.getCategory()
!= ObjectInspector.Category.PRIMITIVE)
|| ((PrimitiveObjectInspector)
amtOI).getPrimitiveCategory() !=
PrimitiveObjectInspector.PrimitiveCategory.INT) {
throw new UDFArgumentTypeException(1, fNm + "
amount must be a integer value " + amtOI.getTypeName() + " was passed as
parameter 1.");
}
Object o = ((ConstantObjectInspector)
amtOI).getWritableConstantValue();
amt = ((IntWritable) o).get();
if (amt < 0) {
throw new UDFArgumentTypeException(1, fNm + "
amount can not be nagative. Specified: " + amt);
}
}
if (paramOIs.length == 3) {
ObjectInspectorConverters.getConverter(paramOIs[2],
paramOIs[0]);
}
GenericUDAFDistinctLeadLagEvaluator eval = createLLEvaluator();
eval.setAmt(amt);
return eval;
}
protected abstract String functionName();
protected abstract GenericUDAFDistinctLeadLagEvaluator
createLLEvaluator();
public static abstract class GenericUDAFDistinctLeadLagEvaluator
extends GenericUDAFEvaluator {
private transient ObjectInspector[] inputOI;
private int amt;
String fnName;
private transient Converter defaultValueConverter;
public GenericUDAFDistinctLeadLagEvaluator() {
}
/*
* used to initialize Streaming Evaluator.
*/
protected
GenericUDAFDistinctLeadLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
this.inputOI = src.inputOI;
this.amt = src.amt;
this.fnName = src.fnName;
this.defaultValueConverter = src.defaultValueConverter;
try {
Field mode =
GenericUDAFEvaluator.class.getDeclaredField("mode");
mode.setAccessible(true);
mode.set(this, mode.get(src));
mode.setAccessible(false);
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NoSuchFieldException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[]
parameters) throws HiveException {
super.init(m, parameters);
if (m != Mode.COMPLETE) {
throw new HiveException("Only COMPLETE mode
supported for " + fnName + " function");
}
inputOI = parameters;
if (parameters.length == 3) {
defaultValueConverter =
ObjectInspectorConverters.getConverter(parameters[2], parameters[0]);
}
return
ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(parameters[0]));
}
public int getAmt() {
return amt;
}
public void setAmt(int amt) {
this.amt = amt;
}
public String getFnName() {
return fnName;
}
public void setFnName(String fnName) {
this.fnName = fnName;
}
protected abstract DistinctLeadLagBuffer getNewLLBuffer()
throws HiveException;
@Override
public AggregationBuffer getNewAggregationBuffer() throws
HiveException {
DistinctLeadLagBuffer lb = getNewLLBuffer();
lb.initialize(amt);
return lb;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((DistinctLeadLagBuffer) agg).initialize(amt);
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
Object rowExprVal =
ObjectInspectorUtils.copyToStandardObject(parameters[0], inputOI[0]);
Object defaultVal = parameters.length > 2 ?
ObjectInspectorUtils.copyToStandardObject(defaultValueConverter.convert(parameters[2]),
inputOI[0]) : null;
((DistinctLeadLagBuffer) agg).addRow(rowExprVal,
defaultVal);
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws
HiveException {
throw new HiveException("terminatePartial not
supported");
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws
HiveException {
throw new HiveException("merge not supported");
}
@Override
public Object terminate(AggregationBuffer agg) throws
HiveException {
return ((DistinctLeadLagBuffer) agg).terminate();
}
}
}
{code}
{code:java}
import
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
@SuppressWarnings("deprecation")
interface DistinctLeadLagBuffer extends AggregationBuffer {
void initialize(int leadAmt);
void addRow(Object leadExprValue, Object defaultValue);
Object terminate();
}
{code}
and package as a jar,add into hive,create a temporary function.in hige hive
vesion,it works, but in{color:#14892c} hive1.1.0{color} version ,It reported an
error {color:#333333}(and in hive1.1.0 if i create that temporary function
named lag or lead,it also works as what i want ,but it will cover hive's
built-in function lag/lead even if deleted that temporary function,Only when I
quit hive-cli and reenter hive-cli , built-in function lag/lead can
work){color}:
{code:java}
hive> SELECT session,sq,distinctLag(channel)over(PARTITION BY session ORDER BY
sq) FROM elephant_active;
Query ID = root_20190131195959_8047b4ba-a85c-4f39-8a27-989388316c50
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1546504603780_0492, Tracking URL =
http://dce.bi.com:8088/proxy/application_1546504603780_0492/
Kill Command =
/opt/cloudera/parcels/CDH-5.10.2-1.cdh5.10.2.p0.5/lib/hadoop/bin/hadoop job
-kill job_1546504603780_0492
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-01-31 20:00:03,639 Stage-1 map = 0%, reduce = 0%
2019-01-31 20:00:09,972 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.38
sec
2019-01-31 20:00:30,795 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 1.38
sec
MapReduce Total cumulative CPU time: 1 seconds 380 msec
Ended Job = job_1546504603780_0492 with errors
Error during job, obtaining debugging information...
Examining task ID: task_1546504603780_0492_m_000000 (and more) from job
job_1546504603780_0492
Task with the most failures(4):
-----
Task ID:
task_1546504603780_0492_r_000000
URL:
http://0.0.0.0:8088/taskdetails.jsp?jobid=job_1546504603780_0492&tipid=task_1546504603780_0492_r_000000
-----
Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: Error in configuring object
at
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at
org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:409)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
... 9 more
Caused by: java.lang.RuntimeException: Reduce operator initialization failed
at
org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:166)
... 14 more
Caused by: java.lang.NullPointerException
at
org.apache.hadoop.hive.ql.exec.Registry.getFunctionInfo(Registry.java:306)
at
org.apache.hadoop.hive.ql.exec.Registry.getWindowFunctionInfo(Registry.java:314)
at
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getWindowFunctionInfo(FunctionRegistry.java:504)
at
org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.streamingPossible(WindowingTableFunction.java:151)
at
org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.setCanAcceptInputAsStream(WindowingTableFunction.java:222)
at
org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.initializeStreaming(WindowingTableFunction.java:256)
at
org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.initializeStreaming(PTFOperator.java:291)
at
org.apache.hadoop.hive.ql.exec.PTFOperator.initializeOp(PTFOperator.java:86)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
at
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
at
org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
at
org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:159)
... 14 more
FAILED: Execution Error, return code 2 from
org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 1.38 sec HDFS Read: 2958
HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 1 seconds 380 msec
hive>
{code}
I guess it's FunctionRegistryproblem.
I am a beginner. I hope someone can tell me the correct way to realize this
special function. Thank you very much. I use Hive 1.1.0 + cdh5.10.2 + 945.
{code:java}
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.1.0-cdh5.10.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
<scope>test</scope>
</dependency>
</dependencies>
{code}
> I want to extends lag/lead functions to Implementing some special functions,
> And I met some problems
> -----------------------------------------------------------------------------------------------------
>
> Key: HIVE-21191
> URL: https://issues.apache.org/jira/browse/HIVE-21191
> Project: Hive
> Issue Type: Wish
> Components: Hive, UDF, Windows
> Affects Versions: 1.1.0
> Reporter: one
> Priority: Minor
> Labels: LAG(), UDAF, UDF, window_function
>
> i want a distinctLag functions ,The function is like lag, but the difference
> is to select different values in front of it.
> Example:
> {color:#14892c}select * from active{color}
> ||session||sq||channel||
> |1|1|A|
> |1|2|B|
> |1|3|B|
> |1|4|C|
> |1|5|B|
> |2|1|C|
> |2|2|B|
> |2|3|B|
> |2|4|A|
> |2|5|B|
> {color:#14892c}
> select session,sq,lag(channel)over(partition by session order by sq) from
> active{color}
> ||session||sq||channel||
> |1|1|null|
> |1|2|A|
> |1|3|B|
> |1|4|B|
> |1|5|C|
> |2|1|null|
> |2|2|C|
> |2|3|B|
> |2|4|B|
> |2|5|A|
> The function I want is:{color:#14892c}
> select session,sq,distinctLag(channel)over(partition by session order by sq)
> from active{color}
> ||session||sq||channel||
> |1|1|null|
> |1|2|A|
> |1|3|A|
> |1|4|B|
> |1|5|C|
> |2|1|null|
> |2|2|C|
> |2|3|C|
> |2|4|B|
> |2|5|A|
>
> i try to extend GenericUDFLeadLag and Override:
> {code:java}
> import org.apache.hadoop.hive.ql.exec.Description;
> import org.apache.hadoop.hive.ql.metadata.HiveException;
> import org.apache.hadoop.hive.ql.udf.UDFType;
> import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
> import
> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
> @Description(
> name = "distinctLag",
> value = "distinctLag (scalar_expression [,offset] [,default]) OVER
> ([query_partition_clause] order_by_clause); "
> + "The distinctLag function is used to access data from a
> distinct previous row.",
> extended = "Example:\n "
> + "select p1.p_mfgr, p1.p_name, p1.p_size,\n"
> + " p1.p_size - distinctLag(p1.p_size,1,p1.p_size) over( distribute
> by p1.p_mfgr sort by p1.p_name) as deltaSz\n"
> + " from part p1 join part p2 on p1.p_partkey = p2.p_partkey")
> @UDFType(impliesOrder = true)
> public class GenericUDFDistinctLag extends GenericUDFLeadLag {
> @Override
> public Object evaluate(DeferredObject[] arguments) throws HiveException
> {
> Object defaultVal = null;
> if (arguments.length == 3) {
> defaultVal =
> ObjectInspectorUtils.copyToStandardObject(getDefaultValueConverter().convert(arguments[2].get()),
> getDefaultArgOI());
> }
> int idx = getpItr().getIndex() - 1;
> int start = 0;
> int end = getpItr().getPartition().size();
> try {
> Object currValue =
> ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().resetToIndex(idx)),
> getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
> Object ret = null;
> int newIdx = idx;
> do {
> --newIdx;
> if (newIdx >= end || newIdx < start) {
> ret = defaultVal;
> return ret;
> }else{
> ret =
> ObjectInspectorUtils.copyToStandardObject(getExprEvaluator().evaluate(getpItr().lag(1)),
> getFirstArgOI(), ObjectInspectorCopyOption.WRITABLE);
> if(ret.equals(currValue)){
> setAmt(getAmt() - 1);
> }
> }
> } while (getAmt() > 0);
> return ret;
> } finally {
> Object currRow = getpItr().resetToIndex(idx);
> // reevaluate expression on current Row, to trigger the
> Lazy object
> // caches to be reset to the current row.
> getExprEvaluator().evaluate(currRow);
> }
> }
> @Override
> protected String _getFnName(){
> return "distinctLag";
> }
> @Override
> protected Object getRow(int amt) throws HiveException {
> throw new HiveException("distinctLag error: cannot call
> getRow");
> }
> @Override
> protected int getIndex(int amt) {
> // TODO Auto-generated method stub
> return 0;
> }
> }{code}
> and package as a jar,add into hive,create a temporary function.
> then,i run:
> {color:#14892c}select session,sq,distinctLag(channel)over(partition by
> session order by sq) from active;{color}
> {color:#333333}It reported an error:{color}
> {color:#d04437}FAILED: SemanticException Failed to breakup Windowing
> invocations into Groups. At least 1 group must only depend on input columns.
> Also check for circular dependencies.
> Underlying error: Invalid function distinctLag{color}
> {color:#333333}I don't know exactly what the problem is. I hope someone can
> give me a hint. Thank you.{color}
> {color:#333333}then,I noticed that there have a UDAF function
> GenericUDAFLag.I tried to imitate it.{color}
> {code:java}
> import java.util.ArrayList;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.hive.ql.exec.Description;
> import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
> import org.apache.hadoop.hive.ql.metadata.HiveException;
> import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
> import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
> import
> org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;
> @WindowFunctionDescription(description = @Description(name = "lag", value =
> "_FUNC_(expr, amt, default)"), supportsWindow = false, pivotResult = true,
> impliesOrder = true)
> public class GenericUDAFDistinctLag extends GenericUDAFDistinctLeadLag {
> static final Log LOG =
> LogFactory.getLog(GenericUDAFDistinctLag.class.getName());
> @Override
> protected String functionName() {
> return "Lag";
> }
> @Override
> protected GenericUDAFDistinctLeadLagEvaluator createLLEvaluator() {
> return new GenericUDAFDistinctLagEvaluator();
> }
> public static class GenericUDAFDistinctLagEvaluator extends
> GenericUDAFDistinctLeadLagEvaluator {
> public GenericUDAFDistinctLagEvaluator() {
> }
> /*
> * used to initialize Streaming Evaluator.
> */
> protected
> GenericUDAFDistinctLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
> super(src);
> }
> @Override
> protected DistinctLeadLagBuffer getNewLLBuffer() throws
> HiveException {
> return new DistinctLagBuffer();
> }
> @Override
> public GenericUDAFEvaluator
> getWindowingEvaluator(WindowFrameDef wFrmDef) {
> return new
> GenericUDAFDistinctLagEvaluatorStreaming(this);
> }
> }
> static class DistinctLagBuffer implements DistinctLeadLagBuffer {
> ArrayList<Object> values;
> int lagAmt;
> ArrayList<Object> lagValues;
> @Override
> public void initialize(int lagAmt) {
> this.lagAmt = lagAmt;
> lagValues = new ArrayList<Object>();
> values = new ArrayList<Object>();
> }
> @Override
> public void addRow(Object currValue, Object defaultValue) {
> int i = values.size() - 1;
> int noEquals = 0;
> for (; i >= 0; i--) {
> if (!currValue.equals(values.get(i))) {
> if (++noEquals == lagAmt) {
> break;
> }
> }
> }
> lagValues.add(i == -1 ? defaultValue : values.get(i));
> values.add(currValue);
> }
> @Override
> public Object terminate() {
> return lagValues;
> }
> }
> /*
> * StreamingEval: wrap regular eval. on getNext remove first row from
> values
> * and return it.
> */
> static class GenericUDAFDistinctLagEvaluatorStreaming extends
> GenericUDAFDistinctLagEvaluator implements ISupportStreamingModeForWindowing {
> protected
> GenericUDAFDistinctLagEvaluatorStreaming(GenericUDAFDistinctLeadLagEvaluator
> src) {
> super(src);
> }
> @Override
> public Object getNextResult(AggregationBuffer agg) throws
> HiveException {
> DistinctLagBuffer lb = (DistinctLagBuffer) agg;
> if (!lb.lagValues.isEmpty()) {
> Object res = lb.lagValues.remove(0);
> if (res == null) {
> return
> ISupportStreamingModeForWindowing.NULL_RESULT;
> }
> return res;
> } else if (!lb.values.isEmpty()) {
> Object res = lb.values.remove(0);
> if (res == null) {
> return
> ISupportStreamingModeForWindowing.NULL_RESULT;
> }
> return res;
> }
> return null;
> }
> @Override
> public int getRowsRemainingAfterTerminate() throws
> HiveException {
> return getAmt();
> }
> }
> }
> {code}
> {code:java}
> import java.lang.reflect.Field;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
> import org.apache.hadoop.hive.ql.metadata.HiveException;
> import org.apache.hadoop.hive.ql.parse.SemanticException;
> import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
> import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
> import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFLead;
> import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
> import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
> import
> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
> import
> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
> import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
> import org.apache.hadoop.io.IntWritable;
> public abstract class GenericUDAFDistinctLeadLag extends
> AbstractGenericUDAFResolver {
> static final Log LOG =
> LogFactory.getLog(GenericUDAFLead.class.getName());
> @Override
> public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo
> parameters) throws SemanticException {
> ObjectInspector[] paramOIs =
> parameters.getParameterObjectInspectors();
> String fNm = functionName();
> if (!(paramOIs.length >= 1 && paramOIs.length <= 3)) {
> throw new UDFArgumentTypeException(paramOIs.length - 1,
> "Incorrect invocation of " + fNm + ": _FUNC_(expr, amt, default)");
> }
> int amt = 1;
> if (paramOIs.length > 1) {
> ObjectInspector amtOI = paramOIs[1];
> if
> (!ObjectInspectorUtils.isConstantObjectInspector(amtOI) ||
> (amtOI.getCategory() != ObjectInspector.Category.PRIMITIVE)
> || ((PrimitiveObjectInspector)
> amtOI).getPrimitiveCategory() !=
> PrimitiveObjectInspector.PrimitiveCategory.INT) {
> throw new UDFArgumentTypeException(1, fNm + "
> amount must be a integer value " + amtOI.getTypeName() + " was passed as
> parameter 1.");
> }
> Object o = ((ConstantObjectInspector)
> amtOI).getWritableConstantValue();
> amt = ((IntWritable) o).get();
> if (amt < 0) {
> throw new UDFArgumentTypeException(1, fNm + "
> amount can not be nagative. Specified: " + amt);
> }
> }
> if (paramOIs.length == 3) {
> ObjectInspectorConverters.getConverter(paramOIs[2],
> paramOIs[0]);
> }
> GenericUDAFDistinctLeadLagEvaluator eval = createLLEvaluator();
> eval.setAmt(amt);
> return eval;
> }
> protected abstract String functionName();
> protected abstract GenericUDAFDistinctLeadLagEvaluator
> createLLEvaluator();
> public static abstract class GenericUDAFDistinctLeadLagEvaluator
> extends GenericUDAFEvaluator {
> private transient ObjectInspector[] inputOI;
> private int amt;
> String fnName;
> private transient Converter defaultValueConverter;
> public GenericUDAFDistinctLeadLagEvaluator() {
> }
> /*
> * used to initialize Streaming Evaluator.
> */
> protected
> GenericUDAFDistinctLeadLagEvaluator(GenericUDAFDistinctLeadLagEvaluator src) {
> this.inputOI = src.inputOI;
> this.amt = src.amt;
> this.fnName = src.fnName;
> this.defaultValueConverter = src.defaultValueConverter;
> try {
> Field mode =
> GenericUDAFEvaluator.class.getDeclaredField("mode");
> mode.setAccessible(true);
> mode.set(this, mode.get(src));
> mode.setAccessible(false);
> } catch (IllegalArgumentException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> } catch (IllegalAccessException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> } catch (NoSuchFieldException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> } catch (SecurityException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> @Override
> public ObjectInspector init(Mode m, ObjectInspector[]
> parameters) throws HiveException {
> super.init(m, parameters);
> if (m != Mode.COMPLETE) {
> throw new HiveException("Only COMPLETE mode
> supported for " + fnName + " function");
> }
> inputOI = parameters;
> if (parameters.length == 3) {
> defaultValueConverter =
> ObjectInspectorConverters.getConverter(parameters[2], parameters[0]);
> }
> return
> ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(parameters[0]));
> }
> public int getAmt() {
> return amt;
> }
> public void setAmt(int amt) {
> this.amt = amt;
> }
> public String getFnName() {
> return fnName;
> }
> public void setFnName(String fnName) {
> this.fnName = fnName;
> }
> protected abstract DistinctLeadLagBuffer getNewLLBuffer()
> throws HiveException;
> @Override
> public AggregationBuffer getNewAggregationBuffer() throws
> HiveException {
> DistinctLeadLagBuffer lb = getNewLLBuffer();
> lb.initialize(amt);
> return lb;
> }
> @Override
> public void reset(AggregationBuffer agg) throws HiveException {
> ((DistinctLeadLagBuffer) agg).initialize(amt);
> }
> @Override
> public void iterate(AggregationBuffer agg, Object[] parameters)
> throws HiveException {
> Object rowExprVal =
> ObjectInspectorUtils.copyToStandardObject(parameters[0], inputOI[0]);
> Object defaultVal = parameters.length > 2 ?
> ObjectInspectorUtils.copyToStandardObject(defaultValueConverter.convert(parameters[2]),
> inputOI[0]) : null;
> ((DistinctLeadLagBuffer) agg).addRow(rowExprVal,
> defaultVal);
> }
> @Override
> public Object terminatePartial(AggregationBuffer agg) throws
> HiveException {
> throw new HiveException("terminatePartial not
> supported");
> }
> @Override
> public void merge(AggregationBuffer agg, Object partial) throws
> HiveException {
> throw new HiveException("merge not supported");
> }
> @Override
> public Object terminate(AggregationBuffer agg) throws
> HiveException {
> return ((DistinctLeadLagBuffer) agg).terminate();
> }
> }
> }
> {code}
> {code:java}
> import
> org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
> @SuppressWarnings("deprecation")
> interface DistinctLeadLagBuffer extends AggregationBuffer {
> void initialize(int leadAmt);
> void addRow(Object leadExprValue, Object defaultValue);
> Object terminate();
> }
> {code}
> and package as a jar,add into hive,create a temporary function.in hige hive
> vesion,it works, but in{color:#14892c} hive1.1.0{color} version ,It reported
> an error {color:#333333}(and in hive1.1.0 if i create that temporary function
> named lag or lead,it also works as what i want ,but it will cover hive's
> built-in function lag/lead even if deleted that temporary function,Only when
> I quit hive-cli and reenter hive-cli , built-in function lag/lead can
> work){color}:
> {code:java}
> hive> SELECT session,sq,distinctLag(channel)over(PARTITION BY session ORDER
> BY sq) FROM elephant_active;
> Query ID = root_20190131195959_8047b4ba-a85c-4f39-8a27-989388316c50
> Total jobs = 1
> Launching Job 1 out of 1
> Number of reduce tasks not specified. Estimated from input data size: 1
> In order to change the average load for a reducer (in bytes):
> set hive.exec.reducers.bytes.per.reducer=<number>
> In order to limit the maximum number of reducers:
> set hive.exec.reducers.max=<number>
> In order to set a constant number of reducers:
> set mapreduce.job.reduces=<number>
> Starting Job = job_1546504603780_0492, Tracking URL =
> http://dce.bi.com:8088/proxy/application_1546504603780_0492/
> Kill Command =
> /opt/cloudera/parcels/CDH-5.10.2-1.cdh5.10.2.p0.5/lib/hadoop/bin/hadoop job
> -kill job_1546504603780_0492
> Hadoop job information for Stage-1: number of mappers: 1; number of reducers:
> 1
> 2019-01-31 20:00:03,639 Stage-1 map = 0%, reduce = 0%
> 2019-01-31 20:00:09,972 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.38
> sec
> 2019-01-31 20:00:30,795 Stage-1 map = 100%, reduce = 100%, Cumulative CPU
> 1.38 sec
> MapReduce Total cumulative CPU time: 1 seconds 380 msec
> Ended Job = job_1546504603780_0492 with errors
> Error during job, obtaining debugging information...
> Examining task ID: task_1546504603780_0492_m_000000 (and more) from job
> job_1546504603780_0492
> Task with the most failures(4):
> -----
> Task ID:
> task_1546504603780_0492_r_000000
> URL:
>
> http://0.0.0.0:8088/taskdetails.jsp?jobid=job_1546504603780_0492&tipid=task_1546504603780_0492_r_000000
> -----
> Diagnostic Messages for this Task:
> Error: java.lang.RuntimeException: Error in configuring object
> at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
> at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
> at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
> at
> org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:409)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
> ... 9 more
> Caused by: java.lang.RuntimeException: Reduce operator initialization failed
> at
> org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:166)
> ... 14 more
> Caused by: java.lang.NullPointerException
> at
> org.apache.hadoop.hive.ql.exec.Registry.getFunctionInfo(Registry.java:306)
> at
> org.apache.hadoop.hive.ql.exec.Registry.getWindowFunctionInfo(Registry.java:314)
> at
> org.apache.hadoop.hive.ql.exec.FunctionRegistry.getWindowFunctionInfo(FunctionRegistry.java:504)
> at
> org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.streamingPossible(WindowingTableFunction.java:151)
> at
> org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.setCanAcceptInputAsStream(WindowingTableFunction.java:222)
> at
> org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.initializeStreaming(WindowingTableFunction.java:256)
> at
> org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.initializeStreaming(PTFOperator.java:291)
> at
> org.apache.hadoop.hive.ql.exec.PTFOperator.initializeOp(PTFOperator.java:86)
> at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
> at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:469)
> at
> org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:425)
> at
> org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
> at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
> at
> org.apache.hadoop.hive.ql.exec.mr.ExecReducer.configure(ExecReducer.java:159)
> ... 14 more
> FAILED: Execution Error, return code 2 from
> org.apache.hadoop.hive.ql.exec.mr.MapRedTask
> MapReduce Jobs Launched:
> Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 1.38 sec HDFS Read: 2958
> HDFS Write: 0 FAIL
> Total MapReduce CPU Time Spent: 1 seconds 380 msec
> hive>
> {code}
> I guess it's FunctionRegistry problem.
> I am a beginner. I hope someone can tell me the correct way to realize this
> special function. Thank you very much. I use Hive 1.1.0 + cdh5.10.2 + 945.
> {code:java}
> <repositories>
> <repository>
> <id>cloudera</id>
> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
> </repository>
> </repositories>
> <dependencies>
> <dependency>
> <groupId>org.apache.hive</groupId>
> <artifactId>hive-exec</artifactId>
> <version>1.1.0-cdh5.10.2</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>junit</groupId>
> <artifactId>junit</artifactId>
> <version>4.9</version>
> <scope>test</scope>
> </dependency>
> </dependencies>
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)