http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java b/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java deleted file mode 100644 index 242c704..0000000 --- a/fe/src/main/java/com/cloudera/impala/hive/executor/UdfExecutor.java +++ /dev/null @@ -1,643 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.hive.executor; - -import java.io.File; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; - -import org.apache.hadoop.hive.ql.exec.UDF; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.log4j.Logger; -import org.apache.thrift.protocol.TBinaryProtocol; - -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.catalog.PrimitiveType; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.ImpalaRuntimeException; -import com.cloudera.impala.common.JniUtil; -import com.cloudera.impala.thrift.THiveUdfExecutorCtorParams; -import com.cloudera.impala.thrift.TPrimitiveType; -import com.cloudera.impala.util.UnsafeUtil; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -// Wrapper object to run hive UDFs. This class works with UdfCallExpr in the -// backend to marshall data back and forth between the execution engine and -// the java UDF class. -// See the comments in be/src/exprs/hive-udf-call.h for more details. -// TODO: should we cache loaded jars and classes? -@SuppressWarnings("restriction") -public class UdfExecutor { - private static final Logger LOG = Logger.getLogger(UdfExecutor.class); - // By convention, the function in the class must be called evaluate() - public static final String UDF_FUNCTION_NAME = "evaluate"; - - // Object to deserialize ctor params from BE. - private final static TBinaryProtocol.Factory PROTOCOL_FACTORY = - new TBinaryProtocol.Factory(); - - private UDF udf_; - private Method method_; - - // Return and argument types of the function inferred from the udf method signature. - // The JavaUdfDataType enum maps it to corresponding primitive type. - private JavaUdfDataType[] argTypes_; - private JavaUdfDataType retType_; - - // Input buffer from the backend. This is valid for the duration of an evaluate() call. - // These buffers are allocated in the BE. - private final long inputBufferPtr_; - private final long inputNullsPtr_; - - // This is the byte offset in inputBufferPtr to the start of the input argument. - // e.g. *inputBufferPtr_[inputBufferOffsets[i]] is the ith input argument. - private final int[] inputBufferOffsets_; - - // Output buffer to return non-string values. This buffers are allocated in the BE. - private final long outputBufferPtr_; - private final long outputNullPtr_; - - // For StringValue return types, outputBufferPtr_ is the location of the 16-byte - // StringValue object. StringValue.ptr is set to outBufferStringPtr_. This buffer - // grows as necessary to fit the return string. - // This is allocated from the FE. - private long outBufferStringPtr_; - - // Size of outBufferStringPtr_. - private int outBufferCapacity_; - - // Preconstructed input objects for the UDF. This minimizes object creation overhead - // as these objects are reused across calls to evaluate(). - private Object[] inputObjects_; - private Object[] inputArgs_; // inputArgs_[i] is either inputObjects_[i] or null - - // Data types that are supported as return or argument types in Java UDFs. - public enum JavaUdfDataType { - INVALID_TYPE("INVALID_TYPE", TPrimitiveType.INVALID_TYPE), - BOOLEAN("BOOLEAN", TPrimitiveType.BOOLEAN), - BOOLEAN_WRITABLE("BOOLEAN_WRITABLE", TPrimitiveType.BOOLEAN), - TINYINT("TINYINT", TPrimitiveType.TINYINT), - BYTE_WRITABLE("BYTE_WRITABLE", TPrimitiveType.TINYINT), - SMALLINT("SMALLINT", TPrimitiveType.SMALLINT), - SHORT_WRITABLE("SHORT_WRITABLE", TPrimitiveType.SMALLINT), - INT("INT", TPrimitiveType.INT), - INT_WRITABLE("INT_WRITABLE", TPrimitiveType.INT), - BIGINT("BIGINT", TPrimitiveType.BIGINT), - LONG_WRITABLE("LONG_WRITABLE", TPrimitiveType.BIGINT), - FLOAT("FLOAT", TPrimitiveType.FLOAT), - FLOAT_WRITABLE("FLOAT_WRITABLE", TPrimitiveType.FLOAT), - DOUBLE("DOUBLE", TPrimitiveType.DOUBLE), - DOUBLE_WRITABLE("DOUBLE", TPrimitiveType.DOUBLE), - STRING("STRING", TPrimitiveType.STRING), - TEXT("TEXT", TPrimitiveType.STRING), - BYTES_WRITABLE("BYTES_WRITABLE", TPrimitiveType.STRING), - BYTE_ARRAY("BYTE_ARRAY", TPrimitiveType.STRING); - - private final String description_; - private final TPrimitiveType thriftType_; - - private JavaUdfDataType(String description, TPrimitiveType thriftType) { - description_ = description; - thriftType_ = thriftType; - } - - @Override - public String toString() { return description_; } - - public TPrimitiveType getPrimitiveType() { return thriftType_; } - - public static JavaUdfDataType getType(Class<?> c) { - if (c == BooleanWritable.class) { - return JavaUdfDataType.BOOLEAN_WRITABLE; - } else if (c == boolean.class || c == Boolean.class) { - return JavaUdfDataType.BOOLEAN; - } else if (c == ByteWritable.class) { - return JavaUdfDataType.BYTE_WRITABLE; - } else if (c == byte.class || c == Byte.class) { - return JavaUdfDataType.TINYINT; - } else if (c == ShortWritable.class) { - return JavaUdfDataType.SHORT_WRITABLE; - } else if (c == short.class || c == Short.class) { - return JavaUdfDataType.SMALLINT; - } else if (c == IntWritable.class) { - return JavaUdfDataType.INT_WRITABLE; - } else if (c == int.class || c == Integer.class) { - return JavaUdfDataType.INT; - } else if (c == LongWritable.class) { - return JavaUdfDataType.LONG_WRITABLE; - } else if (c == long.class || c == Long.class) { - return JavaUdfDataType.BIGINT; - } else if (c == FloatWritable.class) { - return JavaUdfDataType.FLOAT_WRITABLE; - } else if (c == float.class || c == Float.class) { - return JavaUdfDataType.FLOAT; - } else if (c == DoubleWritable.class) { - return JavaUdfDataType.DOUBLE_WRITABLE; - } else if (c == double.class || c == Double.class) { - return JavaUdfDataType.DOUBLE; - } else if (c == byte[].class) { - return JavaUdfDataType.BYTE_ARRAY; - } else if (c == BytesWritable.class) { - return JavaUdfDataType.BYTES_WRITABLE; - } else if (c == Text.class) { - return JavaUdfDataType.TEXT; - } else if (c == String.class) { - return JavaUdfDataType.STRING; - } - return JavaUdfDataType.INVALID_TYPE; - } - - public static boolean isSupported(Type t) { - for(JavaUdfDataType javaType: JavaUdfDataType.values()) { - if (javaType == JavaUdfDataType.INVALID_TYPE) continue; - if (javaType.getPrimitiveType() == t.getPrimitiveType().toThrift()) { - return true; - } - } - return false; - } - } - - /** - * Create a UdfExecutor, using parameters from a serialized thrift object. Used by - * the backend. - */ - public UdfExecutor(byte[] thriftParams) throws ImpalaException { - THiveUdfExecutorCtorParams request = new THiveUdfExecutorCtorParams(); - JniUtil.deserializeThrift(PROTOCOL_FACTORY, request, thriftParams); - - String className = request.fn.scalar_fn.symbol; - String jarFile = request.local_location; - Type retType = Type.fromThrift(request.fn.ret_type); - Type[] parameterTypes = new Type[request.fn.arg_types.size()]; - for (int i = 0; i < request.fn.arg_types.size(); ++i) { - parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i)); - } - inputBufferPtr_ = request.input_buffer_ptr; - inputNullsPtr_ = request.input_nulls_ptr; - outputBufferPtr_ = request.output_buffer_ptr; - outputNullPtr_ = request.output_null_ptr; - outBufferStringPtr_ = 0; - outBufferCapacity_ = 0; - inputBufferOffsets_ = new int[request.input_byte_offsets.size()]; - for (int i = 0; i < request.input_byte_offsets.size(); ++i) { - inputBufferOffsets_[i] = request.input_byte_offsets.get(i).intValue(); - } - - init(jarFile, className, retType, parameterTypes); - } - - @Override - protected void finalize() throws Throwable { - close(); - super.finalize(); - } - - /** - * Releases any resources allocated off the native heap. - */ - public void close() { - UnsafeUtil.UNSAFE.freeMemory(outBufferStringPtr_); - outBufferStringPtr_ = 0; - outBufferCapacity_ = 0; - } - - /** - * evaluate function called by the backend. The inputs to the UDF have - * been serialized to 'input' - */ - public void evaluate() throws ImpalaRuntimeException { - try { - for (int i = 0; i < argTypes_.length; ++i) { - if (UnsafeUtil.UNSAFE.getByte(inputNullsPtr_ + i) == 0) { - switch (argTypes_[i]) { - case BOOLEAN_WRITABLE: - case BYTE_WRITABLE: - case SHORT_WRITABLE: - case INT_WRITABLE: - case LONG_WRITABLE: - case FLOAT_WRITABLE: - case DOUBLE_WRITABLE: - case BYTE_ARRAY: - case BYTES_WRITABLE: - case TEXT: - inputArgs_[i] = inputObjects_[i]; - break; - case BOOLEAN: - inputArgs_[i] = ((ImpalaBooleanWritable)inputObjects_[i]).get(); - break; - case TINYINT: - inputArgs_[i] = ((ImpalaTinyIntWritable)inputObjects_[i]).get(); - break; - case SMALLINT: - inputArgs_[i] = ((ImpalaSmallIntWritable)inputObjects_[i]).get(); - break; - case INT: - inputArgs_[i] = ((ImpalaIntWritable)inputObjects_[i]).get(); - break; - case BIGINT: - inputArgs_[i] = ((ImpalaBigIntWritable)inputObjects_[i]).get(); - break; - case FLOAT: - inputArgs_[i] = ((ImpalaFloatWritable)inputObjects_[i]).get(); - break; - case DOUBLE: - inputArgs_[i] = ((ImpalaDoubleWritable)inputObjects_[i]).get(); - break; - case STRING: - Preconditions.checkState(inputObjects_[i] instanceof ImpalaBytesWritable); - inputArgs_[i] = - new String(((ImpalaBytesWritable)inputObjects_[i]).getBytes()); - break; - } - } else { - inputArgs_[i] = null; - } - } - evaluate(inputArgs_); - } catch (Exception e) { - e.printStackTrace(System.err); - throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e); - } - } - - /** - * Evalutes the UDF with 'args' as the input to the UDF. This is exposed - * for testing and not the version of evaluate() the backend uses. - */ - public long evaluateForTesting(Object... args) throws ImpalaRuntimeException { - try { - Object[] inputArgs = new Object[args.length]; - for (int i = 0; i < args.length; ++i) { - switch (argTypes_[i]) { - case BOOLEAN_WRITABLE: - case BYTE_WRITABLE: - case SHORT_WRITABLE: - case INT_WRITABLE: - case LONG_WRITABLE: - case FLOAT_WRITABLE: - case DOUBLE_WRITABLE: - case TEXT: - case BYTE_ARRAY: - case BYTES_WRITABLE: - case STRING: - inputArgs[i] = args[i]; - break; - case BOOLEAN: - inputArgs[i] = ((ImpalaBooleanWritable)args[i]).get(); - break; - case TINYINT: - inputArgs[i] = ((ImpalaTinyIntWritable)args[i]).get(); - break; - case SMALLINT: - inputArgs[i] = ((ImpalaSmallIntWritable)args[i]).get(); - break; - case INT: - inputArgs[i] = ((ImpalaIntWritable)args[i]).get(); - break; - case BIGINT: - inputArgs[i] = ((ImpalaBigIntWritable)args[i]).get(); - break; - case FLOAT: - inputArgs[i] = ((ImpalaFloatWritable)args[i]).get(); - break; - case DOUBLE: - inputArgs[i] = ((ImpalaDoubleWritable)args[i]).get(); - break; - } - } - return evaluate(inputArgs); - } catch (Exception e) { - e.printStackTrace(System.err); - throw new ImpalaRuntimeException("UDF::evaluate() ran into a problem.", e); - } - } - - /** - * Evalutes the UDF with 'args' as the input to the UDF. - * Returns 0 if the udf returned NULL. (the result is a ptr so this is okay). - */ - private long evaluate(Object... args) throws ImpalaRuntimeException { - try { - storeUdfResult(method_.invoke(udf_, args)); - if (UnsafeUtil.UNSAFE.getByte(outputNullPtr_) == 1) return 0; - return outputBufferPtr_; - } catch (IllegalArgumentException e) { - throw new ImpalaRuntimeException("UDF failed to evaluate", e); - } catch (IllegalAccessException e) { - throw new ImpalaRuntimeException("UDF failed to evaluate", e); - } catch (InvocationTargetException e) { - throw new ImpalaRuntimeException("UDF failed to evaluate", e); - } - } - - public Method getMethod() { return method_; } - - // Sets the result object 'obj' into the outputBufferPtr_ - private void storeUdfResult(Object obj) throws ImpalaRuntimeException { - if (obj == null) { - UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)1); - return; - } - - UnsafeUtil.UNSAFE.putByte(outputNullPtr_, (byte)0); - switch (retType_) { - case BOOLEAN_WRITABLE: { - BooleanWritable val = (BooleanWritable)obj; - UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get() ? (byte)1 : 0); - return; - } - case BOOLEAN: { - UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (boolean)obj ? (byte)1 : 0); - return; - } - case BYTE_WRITABLE: { - ByteWritable val = (ByteWritable)obj; - UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, val.get()); - return; - } - case TINYINT: { - UnsafeUtil.UNSAFE.putByte(outputBufferPtr_, (byte)obj); - return; - } - case SHORT_WRITABLE: { - ShortWritable val = (ShortWritable)obj; - UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, val.get()); - return; - } - case SMALLINT: { - UnsafeUtil.UNSAFE.putShort(outputBufferPtr_, (short)obj); - return; - } - case INT_WRITABLE: { - IntWritable val = (IntWritable)obj; - UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, val.get()); - return; - } - case INT: { - UnsafeUtil.UNSAFE.putInt(outputBufferPtr_, (int)obj); - return; - } - case LONG_WRITABLE: { - LongWritable val = (LongWritable)obj; - UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, val.get()); - return; - } - case BIGINT: { - UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, (long)obj); - return; - } - case FLOAT_WRITABLE: { - FloatWritable val = (FloatWritable)obj; - UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, val.get()); - return; - } - case FLOAT: { - UnsafeUtil.UNSAFE.putFloat(outputBufferPtr_, (float)obj); - return; - } - case DOUBLE_WRITABLE: { - DoubleWritable val = (DoubleWritable)obj; - UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, val.get()); - return; - } - case DOUBLE: { - UnsafeUtil.UNSAFE.putDouble(outputBufferPtr_, (double)obj); - return; - } - case TEXT: { - copyBytesToOutputBuffer(((Text)obj).copyBytes()); - return; - } - case BYTE_ARRAY: { - copyBytesToOutputBuffer((byte[]) obj); - return; - } - case BYTES_WRITABLE: { - copyBytesToOutputBuffer(((BytesWritable)obj).copyBytes()); - return; - } - case STRING: { - copyBytesToOutputBuffer(((String)obj).getBytes()); - return; - } - default: - throw new ImpalaRuntimeException("Unsupported return type: " + retType_); - } - } - - private void copyBytesToOutputBuffer(byte[] bytes) { - if (bytes.length > outBufferCapacity_) { - outBufferStringPtr_ = - UnsafeUtil.UNSAFE.reallocateMemory(outBufferStringPtr_, bytes.length); - outBufferCapacity_ = bytes.length; - UnsafeUtil.UNSAFE.putLong(outputBufferPtr_, outBufferStringPtr_); - } - UnsafeUtil.Copy(outBufferStringPtr_, bytes, 0, bytes.length); - UnsafeUtil.UNSAFE.putInt( - outputBufferPtr_ + ImpalaStringWritable.STRING_VALUE_LEN_OFFSET, - bytes.length); - } - - // Preallocate the input objects that will be passed to the underlying UDF. - // These objects are allocated once and reused across calls to evaluate() - private void allocateInputObjects() throws ImpalaRuntimeException { - inputObjects_ = new Writable[argTypes_.length]; - inputArgs_ = new Object[argTypes_.length]; - - for (int i = 0; i < argTypes_.length; ++i) { - int offset = inputBufferOffsets_[i]; - switch (argTypes_[i]) { - case BOOLEAN: - case BOOLEAN_WRITABLE: - inputObjects_[i] = new ImpalaBooleanWritable(inputBufferPtr_ + offset); - break; - case TINYINT: - case BYTE_WRITABLE: - inputObjects_[i] = new ImpalaTinyIntWritable(inputBufferPtr_ + offset); - break; - case SMALLINT: - case SHORT_WRITABLE: - inputObjects_[i] = new ImpalaSmallIntWritable(inputBufferPtr_ + offset); - break; - case INT: - case INT_WRITABLE: - inputObjects_[i] = new ImpalaIntWritable(inputBufferPtr_ + offset); - break; - case BIGINT: - case LONG_WRITABLE: - inputObjects_[i] = new ImpalaBigIntWritable(inputBufferPtr_ + offset); - break; - case FLOAT: - case FLOAT_WRITABLE: - inputObjects_[i] = new ImpalaFloatWritable(inputBufferPtr_ + offset); - break; - case DOUBLE: - case DOUBLE_WRITABLE: - inputObjects_[i] = new ImpalaDoubleWritable(inputBufferPtr_ + offset); - break; - case TEXT: - inputObjects_[i] = new ImpalaTextWritable(inputBufferPtr_ + offset); - break; - case BYTES_WRITABLE: - inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset); - break; - case STRING: - // String can be mapped to any String-like Writable class. - inputObjects_[i] = new ImpalaBytesWritable(inputBufferPtr_ + offset); - break; - default: - throw new ImpalaRuntimeException("Unsupported argument type: " + argTypes_[i]); - } - } - } - - private ClassLoader getClassLoader(String jarPath) throws MalformedURLException { - if (jarPath == null) { - return ClassLoader.getSystemClassLoader(); - } else { - URL url = new File(jarPath).toURI().toURL(); - return URLClassLoader.newInstance(new URL[] { url }, getClass().getClassLoader()); - } - } - - /** - * Sets the return type of a Java UDF. Returns true if the return type is compatible - * with the return type from the function definition. Throws an ImpalaRuntimeException - * if the return type is not supported. - */ - private boolean setReturnType(Type retType, Class<?> udfReturnType) - throws ImpalaRuntimeException { - if (!JavaUdfDataType.isSupported(retType)) { - throw new ImpalaRuntimeException("Unsupported return type: " + retType.toSql()); - } - JavaUdfDataType javaType = JavaUdfDataType.getType(udfReturnType); - // Check if the evaluate method return type is compatible with the return type from - // the function definition. This happens when both of them map to the same primitive - // type. - if (retType.getPrimitiveType().toThrift() != javaType.getPrimitiveType()) { - return false; - } - retType_ = javaType; - return true; - } - - /** - * Sets the argument types of a Java UDF. Returns true if the argument types specified - * in the UDF are compatible with the argument types of the evaluate() function loaded - * from the associated JAR file. - */ - private boolean setArgTypes(Type[] parameterTypes, Class<?>[] udfArgTypes) { - Preconditions.checkNotNull(argTypes_); - for (int i = 0; i < udfArgTypes.length; ++i) { - argTypes_[i] = JavaUdfDataType.getType(udfArgTypes[i]); - if (argTypes_[i].getPrimitiveType() - != parameterTypes[i].getPrimitiveType().toThrift()) { - return false; - } - } - return true; - } - - /** - * Initializes the UdfExecutor validating the UDF has the proper signature. - * This uses reflection to look up the "evaluate" function in the UDF class. - */ - private void init(String jarPath, String udfPath, - Type retType, Type... parameterTypes) throws - ImpalaRuntimeException { - ArrayList<String> signatures = Lists.newArrayList(); - try { - LOG.debug("Loading UDF '" + udfPath + "' from " + jarPath); - ClassLoader loader = getClassLoader(jarPath); - Class<?> c = Class.forName(udfPath, true, loader); - Class<? extends UDF> udfClass = c.asSubclass(UDF.class); - Constructor<? extends UDF> ctor = udfClass.getConstructor(); - udf_ = ctor.newInstance(); - argTypes_ = new JavaUdfDataType[parameterTypes.length]; - Method[] methods = udfClass.getMethods(); - for (Method m: methods) { - // By convention, the udf must contain the function "evaluate" - if (!m.getName().equals(UDF_FUNCTION_NAME)) continue; - signatures.add(m.toGenericString()); - Class<?>[] methodTypes = m.getParameterTypes(); - - // Try to match the arguments - if (methodTypes.length != parameterTypes.length) continue; - if (methodTypes.length == 0 && parameterTypes.length == 0) { - // Special case where the UDF doesn't take any input args - method_ = m; - if (!setReturnType(retType, m.getReturnType())) continue; - setArgTypes(parameterTypes, methodTypes); - LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath); - return; - } - - method_ = m; - if (!setReturnType(retType, m.getReturnType())) continue; - if (!setArgTypes(parameterTypes, methodTypes)) continue; - allocateInputObjects(); - LOG.debug("Loaded UDF '" + udfPath + "' from " + jarPath); - return; - } - - StringBuilder sb = new StringBuilder(); - sb.append("Unable to find evaluate function with the correct signature: ") - .append(udfPath + ".evaluate(") - .append(Joiner.on(", ").join(parameterTypes)) - .append(")\n") - .append("UDF contains: \n ") - .append(Joiner.on("\n ").join(signatures)); - throw new ImpalaRuntimeException(sb.toString()); - } catch (MalformedURLException e) { - throw new ImpalaRuntimeException("Unable load jar.", e); - } catch (SecurityException e) { - throw new ImpalaRuntimeException("Unable to load function.", e); - } catch (ClassNotFoundException e) { - throw new ImpalaRuntimeException("Unable to find class.", e); - } catch (NoSuchMethodException e) { - throw new ImpalaRuntimeException( - "Unable to find constructor with no arguments.", e); - } catch (IllegalArgumentException e) { - throw new ImpalaRuntimeException( - "Unable to call UDF constructor with no arguments.", e); - } catch (InstantiationException e) { - throw new ImpalaRuntimeException("Unable to call create UDF instance.", e); - } catch (IllegalAccessException e) { - throw new ImpalaRuntimeException("Unable to call create UDF instance.", e); - } catch (InvocationTargetException e) { - throw new ImpalaRuntimeException("Unable to call create UDF instance.", e); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java b/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java deleted file mode 100644 index f6bf8a0..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java +++ /dev/null @@ -1,292 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.AggregateInfo; -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.FunctionCallExpr; -import com.cloudera.impala.analysis.SlotId; -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.thrift.TAggregationNode; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TExpr; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TQueryOptions; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * Aggregation computation. - * - */ -public class AggregationNode extends PlanNode { - private final static Logger LOG = LoggerFactory.getLogger(AggregationNode.class); - - // Default per-host memory requirement used if no valid stats are available. - // TODO: Come up with a more useful heuristic. - private final static long DEFAULT_PER_HOST_MEM = 128L * 1024L * 1024L; - - // Conservative minimum size of hash table for low-cardinality aggregations. - private final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L; - - private final AggregateInfo aggInfo_; - - // Set to true if this aggregation node needs to run the Finalize step. This - // node is the root node of a distributed aggregation. - private boolean needsFinalize_; - - // If true, use streaming preaggregation algorithm. Not valid if this is a merge agg. - private boolean useStreamingPreagg_; - - /** - * Create an agg node from aggInfo. - */ - public AggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo) { - super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE"); - aggInfo_ = aggInfo; - children_.add(input); - needsFinalize_ = true; - } - - /** - * Copy c'tor used in clone(). - */ - private AggregationNode(PlanNodeId id, AggregationNode src) { - super(id, src, "AGGREGATE"); - aggInfo_ = src.aggInfo_; - needsFinalize_ = src.needsFinalize_; - } - - public AggregateInfo getAggInfo() { return aggInfo_; } - - /** - * Unsets this node as requiring finalize. Only valid to call this if it is - * currently marked as needing finalize. - */ - public void unsetNeedsFinalize() { - Preconditions.checkState(needsFinalize_); - needsFinalize_ = false; - } - - /** - * Sets this node as a preaggregation. Only valid to call this if it is not marked - * as a preaggregation - */ - public void setIsPreagg(PlannerContext ctx_) { - TQueryOptions query_options = ctx_.getQueryOptions(); - useStreamingPreagg_ = !query_options.disable_streaming_preaggregations && - aggInfo_.getGroupingExprs().size() > 0; - } - - /** - * Have this node materialize the aggregation's intermediate tuple instead of - * the output tuple. - */ - public void setIntermediateTuple() { - Preconditions.checkState(!tupleIds_.isEmpty()); - Preconditions.checkState(tupleIds_.get(0).equals(aggInfo_.getOutputTupleId())); - tupleIds_.clear(); - tupleIds_.add(aggInfo_.getIntermediateTupleId()); - } - - @Override - public boolean isBlockingNode() { return !useStreamingPreagg_; } - - @Override - public void init(Analyzer analyzer) throws InternalException { - // Assign predicates to the top-most agg in the single-node plan that can evaluate - // them, as follows: For non-distinct aggs place them in the 1st phase agg node. For - // distinct aggs place them in the 2nd phase agg node. The conjuncts are - // transferred to the proper place in the multi-node plan via transferConjuncts(). - if (tupleIds_.get(0).equals(aggInfo_.getResultTupleId()) && !aggInfo_.isMerge()) { - // Ignore predicates bound by a grouping slot produced by a SlotRef grouping expr. - // Those predicates are already evaluated below this agg node (e.g., in a scan), - // because the grouping slot must be in the same equivalence class as another slot - // below this agg node. We must not ignore other grouping slots in order to retain - // conjuncts bound by those grouping slots in createEquivConjuncts() (IMPALA-2089). - // Those conjuncts cannot be redundant because our equivalence classes do not - // capture dependencies with non-SlotRef exprs. - Set<SlotId> groupBySlots = Sets.newHashSet(); - for (int i = 0; i < aggInfo_.getGroupingExprs().size(); ++i) { - if (aggInfo_.getGroupingExprs().get(i).unwrapSlotRef(true) == null) continue; - groupBySlots.add(aggInfo_.getOutputTupleDesc().getSlots().get(i).getId()); - } - ArrayList<Expr> bindingPredicates = - analyzer.getBoundPredicates(tupleIds_.get(0), groupBySlots, true); - conjuncts_.addAll(bindingPredicates); - - // also add remaining unassigned conjuncts_ - assignConjuncts(analyzer); - - analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_, groupBySlots); - } - conjuncts_ = orderConjunctsByCost(conjuncts_); - // Compute the mem layout for both tuples here for simplicity. - aggInfo_.getOutputTupleDesc().computeMemLayout(); - aggInfo_.getIntermediateTupleDesc().computeMemLayout(); - - // do this at the end so it can take all conjuncts into account - computeStats(analyzer); - - // don't call createDefaultSMap(), it would point our conjuncts (= Having clause) - // to our input; our conjuncts don't get substituted because they already - // refer to our output - outputSmap_ = getCombinedChildSmap(); - aggInfo_.substitute(outputSmap_, analyzer); - // assert consistent aggregate expr and slot materialization - aggInfo_.checkConsistency(); - } - - @Override - public void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - // This is prone to overflow, because we keep multiplying cardinalities, - // even if the grouping exprs are functionally dependent (example: - // group by the primary key of a table plus a number of other columns from that - // same table) - // TODO: try to recognize functional dependencies - // TODO: as a shortcut, instead of recognizing functional dependencies, - // limit the contribution of a single table to the number of rows - // of that table (so that when we're grouping by the primary key col plus - // some others, the estimate doesn't overshoot dramatically) - // cardinality: product of # of distinct values produced by grouping exprs - - // Any non-grouping aggregation has at least one distinct value - cardinality_ = aggInfo_.getGroupingExprs().isEmpty() ? 1 : - Expr.getNumDistinctValues(aggInfo_.getGroupingExprs()); - // take HAVING predicate into account - LOG.trace("Agg: cardinality=" + Long.toString(cardinality_)); - if (cardinality_ > 0) { - cardinality_ = Math.round((double) cardinality_ * computeSelectivity()); - LOG.trace("sel=" + Double.toString(computeSelectivity())); - } - // if we ended up with an overflow, the estimate is certain to be wrong - if (cardinality_ < 0) cardinality_ = -1; - // Sanity check the cardinality_ based on the input cardinality_. - if (getChild(0).getCardinality() != -1) { - if (cardinality_ == -1) { - // A worst-case cardinality_ is better than an unknown cardinality_. - cardinality_ = getChild(0).getCardinality(); - } else { - // An AggregationNode cannot increase the cardinality_. - cardinality_ = Math.min(getChild(0).getCardinality(), cardinality_); - } - } - cardinality_ = capAtLimit(cardinality_); - LOG.trace("stats Agg: cardinality=" + Long.toString(cardinality_)); - } - - @Override - protected String debugString() { - return Objects.toStringHelper(this) - .add("aggInfo", aggInfo_.debugString()) - .addValue(super.debugString()) - .toString(); - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.AGGREGATION_NODE; - - List<TExpr> aggregateFunctions = Lists.newArrayList(); - // only serialize agg exprs that are being materialized - for (FunctionCallExpr e: aggInfo_.getMaterializedAggregateExprs()) { - aggregateFunctions.add(e.treeToThrift()); - } - aggInfo_.checkConsistency(); - msg.agg_node = new TAggregationNode( - aggregateFunctions, - aggInfo_.getIntermediateTupleId().asInt(), - aggInfo_.getOutputTupleId().asInt(), needsFinalize_, - useStreamingPreagg_, - getChild(0).getCardinality()); - List<Expr> groupingExprs = aggInfo_.getGroupingExprs(); - if (groupingExprs != null) { - msg.agg_node.setGrouping_exprs(Expr.treesToThrift(groupingExprs)); - } - } - - @Override - protected String getDisplayLabelDetail() { - if (useStreamingPreagg_) return "STREAMING"; - if (needsFinalize_) return "FINALIZE"; - return null; - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - String nameDetail = getDisplayLabelDetail(); - output.append(String.format("%s%s", prefix, getDisplayLabel())); - if (nameDetail != null) output.append(" [" + nameDetail + "]"); - output.append("\n"); - - if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - ArrayList<FunctionCallExpr> aggExprs = aggInfo_.getMaterializedAggregateExprs(); - if (!aggExprs.isEmpty()) { - output.append(detailPrefix + "output: ") - .append(getExplainString(aggExprs) + "\n"); - } - // TODO: is this the best way to display this. It currently would - // have DISTINCT_PC(DISTINCT_PC(col)) for the merge phase but not - // very obvious what that means if you don't already know. - - // TODO: group by can be very long. Break it into multiple lines - if (!aggInfo_.getGroupingExprs().isEmpty()) { - output.append(detailPrefix + "group by: ") - .append(getExplainString(aggInfo_.getGroupingExprs()) + "\n"); - } - if (!conjuncts_.isEmpty()) { - output.append(detailPrefix + "having: ") - .append(getExplainString(conjuncts_) + "\n"); - } - } - return output.toString(); - } - - @Override - public void computeCosts(TQueryOptions queryOptions) { - Preconditions.checkNotNull(fragment_, - "PlanNode must be placed into a fragment before calling this method."); - perHostMemCost_ = 0; - long perHostCardinality = fragment_.getNumDistinctValues(aggInfo_.getGroupingExprs()); - if (perHostCardinality == -1) { - perHostMemCost_ = DEFAULT_PER_HOST_MEM; - return; - } - - // Per-host cardinality cannot be greater than the total output cardinality. - if (cardinality_ != -1) { - perHostCardinality = Math.min(perHostCardinality, cardinality_); - } - perHostMemCost_ += Math.max(perHostCardinality * avgRowSize_ * - PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java deleted file mode 100644 index ccbdaa2..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java +++ /dev/null @@ -1,249 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.AnalyticWindow; -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.ExprSubstitutionMap; -import com.cloudera.impala.analysis.OrderByElement; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.thrift.TAnalyticNode; -import com.cloudera.impala.thrift.TExplainLevel; -import com.cloudera.impala.thrift.TPlanNode; -import com.cloudera.impala.thrift.TPlanNodeType; -import com.cloudera.impala.thrift.TQueryOptions; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Computation of analytic exprs. - */ -public class AnalyticEvalNode extends PlanNode { - private final static Logger LOG = LoggerFactory.getLogger(AnalyticEvalNode.class); - - private List<Expr> analyticFnCalls_; - - // Partitioning exprs from the AnalyticInfo - private final List<Expr> partitionExprs_; - - // TODO: Remove when the BE uses partitionByLessThan rather than the exprs - private List<Expr> substitutedPartitionExprs_; - private List<OrderByElement> orderByElements_; - private final AnalyticWindow analyticWindow_; - - // Physical tuples used/produced by this analytic node. - private final TupleDescriptor intermediateTupleDesc_; - private final TupleDescriptor outputTupleDesc_; - - // maps from the logical output slots in logicalTupleDesc_ to their corresponding - // physical output slots in outputTupleDesc_ - private final ExprSubstitutionMap logicalToPhysicalSmap_; - - // predicates constructed from partitionExprs_/orderingExprs_ to - // compare input to buffered tuples - private final Expr partitionByEq_; - private final Expr orderByEq_; - private final TupleDescriptor bufferedTupleDesc_; - - public AnalyticEvalNode( - PlanNodeId id, PlanNode input, List<Expr> analyticFnCalls, - List<Expr> partitionExprs, List<OrderByElement> orderByElements, - AnalyticWindow analyticWindow, TupleDescriptor intermediateTupleDesc, - TupleDescriptor outputTupleDesc, ExprSubstitutionMap logicalToPhysicalSmap, - Expr partitionByEq, Expr orderByEq, TupleDescriptor bufferedTupleDesc) { - super(id, "ANALYTIC"); - Preconditions.checkState(!tupleIds_.contains(outputTupleDesc.getId())); - analyticFnCalls_ = analyticFnCalls; - partitionExprs_ = partitionExprs; - orderByElements_ = orderByElements; - analyticWindow_ = analyticWindow; - intermediateTupleDesc_ = intermediateTupleDesc; - outputTupleDesc_ = outputTupleDesc; - logicalToPhysicalSmap_ = logicalToPhysicalSmap; - partitionByEq_ = partitionByEq; - orderByEq_ = orderByEq; - bufferedTupleDesc_ = bufferedTupleDesc; - children_.add(input); - computeTupleIds(); - } - - @Override - public void computeTupleIds() { - clearTupleIds(); - tupleIds_.addAll(getChild(0).getTupleIds()); - // we're materializing the input row augmented with the analytic output tuple - tupleIds_.add(outputTupleDesc_.getId()); - nullableTupleIds_.addAll(getChild(0).getNullableTupleIds()); - } - - @Override - public boolean isBlockingNode() { return true; } - public List<Expr> getPartitionExprs() { return partitionExprs_; } - public List<OrderByElement> getOrderByElements() { return orderByElements_; } - - @Override - public void init(Analyzer analyzer) { - Preconditions.checkState(conjuncts_.isEmpty()); - computeMemLayout(analyzer); - intermediateTupleDesc_.computeMemLayout(); - - // we add the analyticInfo's smap to the combined smap of our child - outputSmap_ = logicalToPhysicalSmap_; - createDefaultSmap(analyzer); - - // Do not assign any conjuncts here: the conjuncts out of our SelectStmt's - // Where clause have already been assigned, and conjuncts coming out of an - // enclosing scope need to be evaluated *after* all analytic computations. - - // do this at the end so it can take all conjuncts into account - computeStats(analyzer); - - LOG.trace("desctbl: " + analyzer.getDescTbl().debugString()); - - // point fn calls, partition and ordering exprs at our input - ExprSubstitutionMap childSmap = getCombinedChildSmap(); - analyticFnCalls_ = Expr.substituteList(analyticFnCalls_, childSmap, analyzer, false); - substitutedPartitionExprs_ = Expr.substituteList(partitionExprs_, childSmap, - analyzer, false); - orderByElements_ = OrderByElement.substitute(orderByElements_, childSmap, analyzer); - LOG.trace("evalnode: " + debugString()); - } - - @Override - protected void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - cardinality_ = getChild(0).cardinality_; - cardinality_ = capAtLimit(cardinality_); - } - - @Override - protected String debugString() { - List<String> orderByElementStrs = Lists.newArrayList(); - for (OrderByElement element: orderByElements_) { - orderByElementStrs.add(element.toSql()); - } - return Objects.toStringHelper(this) - .add("analyticFnCalls", Expr.debugString(analyticFnCalls_)) - .add("partitionExprs", Expr.debugString(partitionExprs_)) - .add("subtitutedPartitionExprs", Expr.debugString(substitutedPartitionExprs_)) - .add("orderByElements", Joiner.on(", ").join(orderByElementStrs)) - .add("window", analyticWindow_) - .add("intermediateTid", intermediateTupleDesc_.getId()) - .add("outputTid", outputTupleDesc_.getId()) - .add("partitionByEq", - partitionByEq_ != null ? partitionByEq_.debugString() : "null") - .add("orderByEq", - orderByEq_ != null ? orderByEq_.debugString() : "null") - .addValue(super.debugString()) - .toString(); - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.ANALYTIC_EVAL_NODE; - msg.analytic_node = new TAnalyticNode(); - msg.analytic_node.setIntermediate_tuple_id(intermediateTupleDesc_.getId().asInt()); - msg.analytic_node.setOutput_tuple_id(outputTupleDesc_.getId().asInt()); - msg.analytic_node.setPartition_exprs(Expr.treesToThrift(substitutedPartitionExprs_)); - msg.analytic_node.setOrder_by_exprs( - Expr.treesToThrift(OrderByElement.getOrderByExprs(orderByElements_))); - msg.analytic_node.setAnalytic_functions(Expr.treesToThrift(analyticFnCalls_)); - if (analyticWindow_ == null) { - if (!orderByElements_.isEmpty()) { - msg.analytic_node.setWindow(AnalyticWindow.DEFAULT_WINDOW.toThrift()); - } - } else { - // TODO: Window boundaries should have range_offset_predicate set - msg.analytic_node.setWindow(analyticWindow_.toThrift()); - } - if (partitionByEq_ != null) { - msg.analytic_node.setPartition_by_eq(partitionByEq_.treeToThrift()); - } - if (orderByEq_ != null) { - msg.analytic_node.setOrder_by_eq(orderByEq_.treeToThrift()); - } - if (bufferedTupleDesc_ != null) { - msg.analytic_node.setBuffered_tuple_id(bufferedTupleDesc_.getId().asInt()); - } - } - - @Override - protected String getNodeExplainString(String prefix, String detailPrefix, - TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - output.append(String.format("%s%s", prefix, getDisplayLabel())); - output.append("\n"); - if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { - output.append(detailPrefix + "functions: "); - List<String> strings = Lists.newArrayList(); - for (Expr fnCall: analyticFnCalls_) { - strings.add(fnCall.toSql()); - } - output.append(Joiner.on(", ").join(strings)); - output.append("\n"); - - if (!partitionExprs_.isEmpty()) { - output.append(detailPrefix + "partition by: "); - strings.clear(); - for (Expr partitionExpr: partitionExprs_) { - strings.add(partitionExpr.toSql()); - } - output.append(Joiner.on(", ").join(strings)); - output.append("\n"); - } - - if (!orderByElements_.isEmpty()) { - output.append(detailPrefix + "order by: "); - strings.clear(); - for (OrderByElement element: orderByElements_) { - strings.add(element.toSql()); - } - output.append(Joiner.on(", ").join(strings)); - output.append("\n"); - } - - if (analyticWindow_ != null) { - output.append(detailPrefix + "window: "); - output.append(analyticWindow_.toSql()); - output.append("\n"); - } - - if (!conjuncts_.isEmpty()) { - output.append( - detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n"); - } - } - return output.toString(); - } - - @Override - public void computeCosts(TQueryOptions queryOptions) { - Preconditions.checkNotNull(fragment_, - "PlanNode must be placed into a fragment before calling this method."); - // TODO: come up with estimate based on window - perHostMemCost_ = 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java deleted file mode 100644 index c02096e..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java +++ /dev/null @@ -1,815 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import java.util.Collections; -import java.util.Comparator; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.impala.analysis.AggregateInfoBase; -import com.cloudera.impala.analysis.AnalyticExpr; -import com.cloudera.impala.analysis.AnalyticInfo; -import com.cloudera.impala.analysis.AnalyticWindow; -import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.analysis.BinaryPredicate; -import com.cloudera.impala.analysis.BoolLiteral; -import com.cloudera.impala.analysis.CompoundPredicate; -import com.cloudera.impala.analysis.CompoundPredicate.Operator; -import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.analysis.ExprSubstitutionMap; -import com.cloudera.impala.analysis.IsNullPredicate; -import com.cloudera.impala.analysis.OrderByElement; -import com.cloudera.impala.analysis.SlotDescriptor; -import com.cloudera.impala.analysis.SlotRef; -import com.cloudera.impala.analysis.SortInfo; -import com.cloudera.impala.analysis.TupleDescriptor; -import com.cloudera.impala.analysis.TupleId; -import com.cloudera.impala.analysis.TupleIsNullPredicate; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.thrift.TPartitionType; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - - -/** - * The analytic planner adds plan nodes to an existing plan tree in order to - * implement the AnalyticInfo of a given query stmt. The resulting plan reflects - * similarities among analytic exprs with respect to partitioning, ordering and - * windowing to reduce data exchanges and sorts (the exchanges and sorts are currently - * not minimal). The generated plan has the following structure: - * ... - * ( - * ( - * ( - * analytic node <-- group of analytic exprs with compatible window - * )+ <-- group of analytic exprs with compatible ordering - * sort node? - * )+ <-- group of analytic exprs with compatible partitioning - * hash exchange? - * )* <-- group of analytic exprs that have different partitioning - * input plan node - * ... - */ -public class AnalyticPlanner { - private final static Logger LOG = LoggerFactory.getLogger(AnalyticPlanner.class); - - private final AnalyticInfo analyticInfo_; - private final Analyzer analyzer_; - private final PlannerContext ctx_; - - public AnalyticPlanner(AnalyticInfo analyticInfo, Analyzer analyzer, - PlannerContext ctx) { - analyticInfo_ = analyticInfo; - analyzer_ = analyzer; - ctx_ = ctx; - } - - /** - * Return plan tree that augments 'root' with plan nodes that implement single-node - * evaluation of the AnalyticExprs in analyticInfo. - * This plan takes into account a possible hash partition of its input on - * 'groupingExprs'; if this is non-null, it returns in 'inputPartitionExprs' - * a subset of the grouping exprs which should be used for the aggregate - * hash partitioning during the parallelization of 'root'. - * TODO: when generating sort orders for the sort groups, optimize the ordering - * of the partition exprs (so that subsequent sort operations see the input sorted - * on a prefix of their required sort exprs) - * TODO: when merging sort groups, recognize equivalent exprs - * (using the equivalence classes) rather than looking for expr equality - */ - public PlanNode createSingleNodePlan(PlanNode root, - List<Expr> groupingExprs, List<Expr> inputPartitionExprs) throws ImpalaException { - List<WindowGroup> windowGroups = collectWindowGroups(); - for (int i = 0; i < windowGroups.size(); ++i) { - windowGroups.get(i).init(analyzer_, "wg-" + i); - } - List<SortGroup> sortGroups = collectSortGroups(windowGroups); - mergeSortGroups(sortGroups); - for (SortGroup g: sortGroups) { - g.init(); - } - List<PartitionGroup> partitionGroups = collectPartitionGroups(sortGroups); - mergePartitionGroups(partitionGroups, root.getNumNodes()); - orderGroups(partitionGroups); - if (groupingExprs != null) { - Preconditions.checkNotNull(inputPartitionExprs); - computeInputPartitionExprs( - partitionGroups, groupingExprs, root.getNumNodes(), inputPartitionExprs); - } - - for (PartitionGroup partitionGroup: partitionGroups) { - for (int i = 0; i < partitionGroup.sortGroups.size(); ++i) { - root = createSortGroupPlan(root, partitionGroup.sortGroups.get(i), - i == 0 ? partitionGroup.partitionByExprs : null); - } - } - - // create equiv classes for newly added slots - analyzer_.createIdentityEquivClasses(); - return root; - } - - /** - * Coalesce sort groups that have compatible partition-by exprs and - * have a prefix relationship. - */ - private void mergeSortGroups(List<SortGroup> sortGroups) { - boolean hasMerged = false; - do { - hasMerged = false; - for (SortGroup sg1: sortGroups) { - for (SortGroup sg2: sortGroups) { - if (sg1 != sg2 && sg1.isPrefixOf(sg2)) { - sg1.absorb(sg2); - sortGroups.remove(sg2); - hasMerged = true; - break; - } - } - if (hasMerged) break; - } - } while (hasMerged); - } - - /** - * Coalesce partition groups for which the intersection of their - * partition exprs has ndv estimate > numNodes, so that the resulting plan - * still parallelizes across all nodes. - */ - private void mergePartitionGroups( - List<PartitionGroup> partitionGroups, int numNodes) { - boolean hasMerged = false; - do { - hasMerged = false; - for (PartitionGroup pg1: partitionGroups) { - for (PartitionGroup pg2: partitionGroups) { - if (pg1 != pg2) { - long ndv = Expr.getNumDistinctValues( - Expr.intersect(pg1.partitionByExprs, pg2.partitionByExprs)); - if (ndv == -1 || ndv < 0 || ndv < numNodes) { - // didn't get a usable value or the number of partitions is too small - continue; - } - pg1.merge(pg2); - partitionGroups.remove(pg2); - hasMerged = true; - break; - } - } - if (hasMerged) break; - } - } while (hasMerged); - } - - /** - * Determine the partition group that has the maximum intersection in terms - * of the estimated ndv of the partition exprs with groupingExprs. - * That partition group is placed at the front of partitionGroups, with its - * partition exprs reduced to the intersection, and the intersecting groupingExprs - * are returned in inputPartitionExprs. - */ - private void computeInputPartitionExprs(List<PartitionGroup> partitionGroups, - List<Expr> groupingExprs, int numNodes, List<Expr> inputPartitionExprs) { - inputPartitionExprs.clear(); - Preconditions.checkState(numNodes != -1); - // find partition group with maximum intersection - long maxNdv = 0; - PartitionGroup maxPg = null; - List<Expr> maxGroupingExprs = null; - for (PartitionGroup pg: partitionGroups) { - List<Expr> l1 = Lists.newArrayList(); - List<Expr> l2 = Lists.newArrayList(); - Expr.intersect(analyzer_, pg.partitionByExprs, groupingExprs, - analyzer_.getEquivClassSmap(), l1, l2); - // TODO: also look at l2 and take the max? - long ndv = Expr.getNumDistinctValues(l1); - if (ndv < 0 || ndv < numNodes || ndv < maxNdv) continue; - // found a better partition group - maxPg = pg; - maxPg.partitionByExprs = l1; - maxGroupingExprs = l2; - maxNdv = ndv; - } - - if (maxNdv > numNodes) { - Preconditions.checkNotNull(maxPg); - // we found a partition group that gives us enough parallelism; - // move it to the front - partitionGroups.remove(maxPg); - partitionGroups.add(0, maxPg); - inputPartitionExprs.addAll(maxGroupingExprs); - } - } - - /** - * Order partition groups (and the sort groups within them) by increasing - * totalOutputTupleSize. This minimizes the total volume of data that needs to be - * repartitioned and sorted. - * Also move the non-partitioning partition group to the end. - */ - private void orderGroups(List<PartitionGroup> partitionGroups) { - // remove the non-partitioning group from partitionGroups - PartitionGroup nonPartitioning = null; - for (PartitionGroup pg: partitionGroups) { - if (pg.partitionByExprs.isEmpty()) { - nonPartitioning = pg; - break; - } - } - if (nonPartitioning != null) partitionGroups.remove(nonPartitioning); - - // order by ascending combined output tuple size - Collections.sort(partitionGroups, - new Comparator<PartitionGroup>() { - public int compare(PartitionGroup pg1, PartitionGroup pg2) { - Preconditions.checkState(pg1.totalOutputTupleSize > 0); - Preconditions.checkState(pg2.totalOutputTupleSize > 0); - int diff = pg1.totalOutputTupleSize - pg2.totalOutputTupleSize; - return (diff < 0 ? -1 : (diff > 0 ? 1 : 0)); - } - }); - if (nonPartitioning != null) partitionGroups.add(nonPartitioning); - - for (PartitionGroup pg: partitionGroups) { - pg.orderSortGroups(); - } - } - - /** - * Create SortInfo, including sort tuple, to sort entire input row - * on sortExprs. - */ - private SortInfo createSortInfo( - PlanNode input, List<Expr> sortExprs, List<Boolean> isAsc, - List<Boolean> nullsFirst) { - // create tuple for sort output = the entire materialized input in a single tuple - TupleDescriptor sortTupleDesc = - analyzer_.getDescTbl().createTupleDescriptor("sort-tuple"); - ExprSubstitutionMap sortSmap = new ExprSubstitutionMap(); - List<Expr> sortSlotExprs = Lists.newArrayList(); - sortTupleDesc.setIsMaterialized(true); - for (TupleId tid: input.getTupleIds()) { - TupleDescriptor tupleDesc = analyzer_.getTupleDesc(tid); - for (SlotDescriptor inputSlotDesc: tupleDesc.getSlots()) { - if (!inputSlotDesc.isMaterialized()) continue; - SlotDescriptor sortSlotDesc = - analyzer_.copySlotDescriptor(inputSlotDesc, sortTupleDesc); - // all output slots need to be materialized - sortSlotDesc.setIsMaterialized(true); - sortSmap.put(new SlotRef(inputSlotDesc), new SlotRef(sortSlotDesc)); - sortSlotExprs.add(new SlotRef(inputSlotDesc)); - } - } - - // Lhs exprs to be substituted in ancestor plan nodes could have a rhs that contains - // TupleIsNullPredicates. TupleIsNullPredicates require specific tuple ids for - // evaluation. Since this sort materializes a new tuple, it's impossible to evaluate - // TupleIsNullPredicates referring to this sort's input after this sort, - // To preserve the information whether an input tuple was null or not this sort node, - // we materialize those rhs TupleIsNullPredicates, which are then substituted - // by a SlotRef into the sort's tuple in ancestor nodes (IMPALA-1519). - ExprSubstitutionMap inputSmap = input.getOutputSmap(); - if (inputSmap != null) { - List<Expr> tupleIsNullPredsToMaterialize = Lists.newArrayList(); - for (int i = 0; i < inputSmap.size(); ++i) { - Expr rhsExpr = inputSmap.getRhs().get(i); - // Ignore substitutions that are irrelevant at this plan node and its ancestors. - if (!rhsExpr.isBoundByTupleIds(input.getTupleIds())) continue; - rhsExpr.collect(TupleIsNullPredicate.class, tupleIsNullPredsToMaterialize); - } - Expr.removeDuplicates(tupleIsNullPredsToMaterialize); - - // Materialize relevant unique TupleIsNullPredicates. - for (Expr tupleIsNullPred: tupleIsNullPredsToMaterialize) { - SlotDescriptor sortSlotDesc = analyzer_.addSlotDescriptor(sortTupleDesc); - sortSlotDesc.setType(tupleIsNullPred.getType()); - sortSlotDesc.setIsMaterialized(true); - sortSlotDesc.setSourceExpr(tupleIsNullPred); - sortSlotDesc.setLabel(tupleIsNullPred.toSql()); - sortSlotExprs.add(tupleIsNullPred.clone()); - } - } - - SortInfo sortInfo = new SortInfo( - Expr.substituteList(sortExprs, sortSmap, analyzer_, false), isAsc, nullsFirst); - LOG.trace("sortinfo exprs: " + Expr.debugString(sortInfo.getOrderingExprs())); - sortInfo.setMaterializedTupleInfo(sortTupleDesc, sortSlotExprs); - return sortInfo; - } - - /** - * Create plan tree for the entire sort group, including all contained window groups. - * Marks the SortNode as requiring its input to be partitioned if partitionExprs - * is not null (partitionExprs represent the data partition of the entire partition - * group of which this sort group is a part). - */ - private PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup, - List<Expr> partitionExprs) throws ImpalaException { - List<Expr> partitionByExprs = sortGroup.partitionByExprs; - List<OrderByElement> orderByElements = sortGroup.orderByElements; - ExprSubstitutionMap sortSmap = null; - TupleId sortTupleId = null; - TupleDescriptor bufferedTupleDesc = null; - // map from input to buffered tuple - ExprSubstitutionMap bufferedSmap = new ExprSubstitutionMap(); - - // sort on partition by (pb) + order by (ob) exprs and create pb/ob predicates - if (!partitionByExprs.isEmpty() || !orderByElements.isEmpty()) { - // first sort on partitionExprs (direction doesn't matter) - List<Expr> sortExprs = Lists.newArrayList(partitionByExprs); - List<Boolean> isAsc = - Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true))); - // TODO: utilize a direction and nulls/first last that has benefit - // for subsequent sort groups - List<Boolean> nullsFirst = - Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true))); - - // then sort on orderByExprs - for (OrderByElement orderByElement: sortGroup.orderByElements) { - sortExprs.add(orderByElement.getExpr()); - isAsc.add(orderByElement.isAsc()); - nullsFirst.add(orderByElement.getNullsFirstParam()); - } - - SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst); - SortNode sortNode = new SortNode(ctx_.getNextNodeId(), root, sortInfo, false, 0); - - // if this sort group does not have partitioning exprs, we want the sort - // to be executed like a regular distributed sort - if (!partitionByExprs.isEmpty()) sortNode.setIsAnalyticSort(true); - - if (partitionExprs != null) { - // create required input partition - DataPartition inputPartition = DataPartition.UNPARTITIONED; - if (!partitionExprs.isEmpty()) { - inputPartition = DataPartition.hashPartitioned(partitionExprs); - } - sortNode.setInputPartition(inputPartition); - } - - root = sortNode; - root.init(analyzer_); - sortSmap = sortNode.getOutputSmap(); - - // create bufferedTupleDesc and bufferedSmap - sortTupleId = sortNode.tupleIds_.get(0); - bufferedTupleDesc = - analyzer_.getDescTbl().copyTupleDescriptor(sortTupleId, "buffered-tuple"); - LOG.trace("desctbl: " + analyzer_.getDescTbl().debugString()); - - List<SlotDescriptor> inputSlots = analyzer_.getTupleDesc(sortTupleId).getSlots(); - List<SlotDescriptor> bufferedSlots = bufferedTupleDesc.getSlots(); - for (int i = 0; i < inputSlots.size(); ++i) { - bufferedSmap.put( - new SlotRef(inputSlots.get(i)), new SlotRef(bufferedSlots.get(i))); - } - } - - // create one AnalyticEvalNode per window group - for (WindowGroup windowGroup: sortGroup.windowGroups) { - // Create partition-by (pb) and order-by (ob) less-than predicates between the - // input tuple (the output of the preceding sort) and a buffered tuple that is - // identical to the input tuple. We need a different tuple descriptor for the - // buffered tuple because the generated predicates should compare two different - // tuple instances from the same input stream (i.e., the predicates should be - // evaluated over a row that is composed of the input and the buffered tuple). - - // we need to remap the pb/ob exprs to a) the sort output, b) our buffer of the - // sort input - Expr partitionByEq = null; - if (!windowGroup.partitionByExprs.isEmpty()) { - partitionByEq = createNullMatchingEquals( - Expr.substituteList(windowGroup.partitionByExprs, sortSmap, analyzer_, false), - sortTupleId, bufferedSmap); - LOG.trace("partitionByEq: " + partitionByEq.debugString()); - } - Expr orderByEq = null; - if (!windowGroup.orderByElements.isEmpty()) { - orderByEq = createNullMatchingEquals( - OrderByElement.getOrderByExprs(OrderByElement.substitute( - windowGroup.orderByElements, sortSmap, analyzer_)), - sortTupleId, bufferedSmap); - LOG.trace("orderByEq: " + orderByEq.debugString()); - } - - root = new AnalyticEvalNode(ctx_.getNextNodeId(), root, - windowGroup.analyticFnCalls, windowGroup.partitionByExprs, - windowGroup.orderByElements, windowGroup.window, - windowGroup.physicalIntermediateTuple, windowGroup.physicalOutputTuple, - windowGroup.logicalToPhysicalSmap, - partitionByEq, orderByEq, bufferedTupleDesc); - root.init(analyzer_); - } - return root; - } - - /** - * Create a predicate that checks if all exprs are equal or both sides are null. - */ - private Expr createNullMatchingEquals(List<Expr> exprs, TupleId inputTid, - ExprSubstitutionMap bufferedSmap) { - Preconditions.checkState(!exprs.isEmpty()); - Expr result = createNullMatchingEqualsAux(exprs, 0, inputTid, bufferedSmap); - result.analyzeNoThrow(analyzer_); - return result; - } - - /** - * Create an unanalyzed predicate that checks if elements >= i are equal or - * both sides are null. - * - * The predicate has the form - * ((lhs[i] is null && rhs[i] is null) || ( - * lhs[i] is not null && rhs[i] is not null && lhs[i] = rhs[i])) - * && <createEqualsAux(i + 1)> - */ - private Expr createNullMatchingEqualsAux(List<Expr> elements, int i, - TupleId inputTid, ExprSubstitutionMap bufferedSmap) { - if (i > elements.size() - 1) return new BoolLiteral(true); - - // compare elements[i] - Expr lhs = elements.get(i); - Preconditions.checkState(lhs.isBound(inputTid)); - Expr rhs = lhs.substitute(bufferedSmap, analyzer_, false); - - Expr bothNull = new CompoundPredicate(Operator.AND, - new IsNullPredicate(lhs, false), new IsNullPredicate(rhs, false)); - Expr lhsEqRhsNotNull = new CompoundPredicate(Operator.AND, - new CompoundPredicate(Operator.AND, - new IsNullPredicate(lhs, true), new IsNullPredicate(rhs, true)), - new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs)); - Expr remainder = createNullMatchingEqualsAux(elements, i + 1, inputTid, bufferedSmap); - return new CompoundPredicate(CompoundPredicate.Operator.AND, - new CompoundPredicate(Operator.OR, bothNull, lhsEqRhsNotNull), - remainder); - } - - /** - * Collection of AnalyticExprs that share the same partition-by/order-by/window - * specification. The AnalyticExprs are stored broken up into their constituent parts. - */ - private static class WindowGroup { - public final List<Expr> partitionByExprs; - public final List<OrderByElement> orderByElements; - public final AnalyticWindow window; // not null - - // Analytic exprs belonging to this window group and their corresponding logical - // intermediate and output slots from AnalyticInfo.intermediateTupleDesc_ - // and AnalyticInfo.outputTupleDesc_. - public final List<AnalyticExpr> analyticExprs = Lists.newArrayList(); - // Result of getFnCall() for every analytic expr. - public final List<Expr> analyticFnCalls = Lists.newArrayList(); - public final List<SlotDescriptor> logicalOutputSlots = Lists.newArrayList(); - public final List<SlotDescriptor> logicalIntermediateSlots = Lists.newArrayList(); - - // Physical output and intermediate tuples as well as an smap that maps the - // corresponding logical output slots to their physical slots in physicalOutputTuple. - // Set in init(). - public TupleDescriptor physicalOutputTuple; - public TupleDescriptor physicalIntermediateTuple; - public final ExprSubstitutionMap logicalToPhysicalSmap = new ExprSubstitutionMap(); - - public WindowGroup(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot, - SlotDescriptor logicalIntermediateSlot) { - partitionByExprs = analyticExpr.getPartitionExprs(); - orderByElements = analyticExpr.getOrderByElements(); - window = analyticExpr.getWindow(); - analyticExprs.add(analyticExpr); - analyticFnCalls.add(analyticExpr.getFnCall()); - logicalOutputSlots.add(logicalOutputSlot); - logicalIntermediateSlots.add(logicalIntermediateSlot); - } - - /** - * True if this analytic function must be evaluated in its own WindowGroup. - */ - private static boolean requiresIndependentEval(AnalyticExpr analyticExpr) { - return analyticExpr.getFnCall().getFnName().getFunction().equals( - AnalyticExpr.FIRST_VALUE_REWRITE); - } - - /** - * True if the partition exprs and ordering elements and the window of analyticExpr - * match ours. - */ - public boolean isCompatible(AnalyticExpr analyticExpr) { - if (requiresIndependentEval(analyticExprs.get(0)) || - requiresIndependentEval(analyticExpr)) { - return false; - } - - if (!Expr.equalSets(analyticExpr.getPartitionExprs(), partitionByExprs)) { - return false; - } - if (!analyticExpr.getOrderByElements().equals(orderByElements)) return false; - if ((window == null) != (analyticExpr.getWindow() == null)) return false; - if (window == null) return true; - return analyticExpr.getWindow().equals(window); - } - - /** - * Adds the given analytic expr and its logical slots to this window group. - * Assumes the corresponding analyticExpr is compatible with 'this'. - */ - public void add(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot, - SlotDescriptor logicalIntermediateSlot) { - Preconditions.checkState(isCompatible(analyticExpr)); - analyticExprs.add(analyticExpr); - analyticFnCalls.add(analyticExpr.getFnCall()); - logicalOutputSlots.add(logicalOutputSlot); - logicalIntermediateSlots.add(logicalIntermediateSlot); - } - - /** - * Creates the physical output and intermediate tuples as well as the logical to - * physical smap for this window group. Computes the mem layout for the tuple - * descriptors. - */ - public void init(Analyzer analyzer, String tupleName) { - Preconditions.checkState(physicalOutputTuple == null); - Preconditions.checkState(physicalIntermediateTuple == null); - Preconditions.checkState(analyticFnCalls.size() == analyticExprs.size()); - - // If needed, create the intermediate tuple first to maintain - // intermediateTupleId < outputTupleId for debugging purposes and consistency with - // tuple creation for aggregations. - boolean requiresIntermediateTuple = - AggregateInfoBase.requiresIntermediateTuple(analyticFnCalls); - if (requiresIntermediateTuple) { - physicalIntermediateTuple = - analyzer.getDescTbl().createTupleDescriptor(tupleName + "intermed"); - physicalOutputTuple = - analyzer.getDescTbl().createTupleDescriptor(tupleName + "out"); - } else { - physicalOutputTuple = - analyzer.getDescTbl().createTupleDescriptor(tupleName + "out"); - physicalIntermediateTuple = physicalOutputTuple; - } - - Preconditions.checkState(analyticExprs.size() == logicalIntermediateSlots.size()); - Preconditions.checkState(analyticExprs.size() == logicalOutputSlots.size()); - for (int i = 0; i < analyticExprs.size(); ++i) { - SlotDescriptor logicalOutputSlot = logicalOutputSlots.get(i); - SlotDescriptor physicalOutputSlot = - analyzer.copySlotDescriptor(logicalOutputSlot, physicalOutputTuple); - physicalOutputSlot.setIsMaterialized(true); - if (requiresIntermediateTuple) { - SlotDescriptor logicalIntermediateSlot = logicalIntermediateSlots.get(i); - SlotDescriptor physicalIntermediateSlot = analyzer.copySlotDescriptor( - logicalIntermediateSlot, physicalIntermediateTuple); - physicalIntermediateSlot.setIsMaterialized(true); - } - logicalToPhysicalSmap.put( - new SlotRef(logicalOutputSlot), new SlotRef(physicalOutputSlot)); - } - physicalOutputTuple.computeMemLayout(); - if (requiresIntermediateTuple) physicalIntermediateTuple.computeMemLayout(); - } - } - - /** - * Extract a minimal set of WindowGroups from analyticExprs. - */ - private List<WindowGroup> collectWindowGroups() { - List<Expr> analyticExprs = analyticInfo_.getAnalyticExprs(); - List<WindowGroup> groups = Lists.newArrayList(); - for (int i = 0; i < analyticExprs.size(); ++i) { - AnalyticExpr analyticExpr = (AnalyticExpr) analyticExprs.get(i); - // Do not generate the plan for non-materialized analytic exprs. - if (!analyticInfo_.getOutputTupleDesc().getSlots().get(i).isMaterialized()) { - continue; - } - boolean match = false; - for (WindowGroup group: groups) { - if (group.isCompatible(analyticExpr)) { - group.add((AnalyticExpr) analyticInfo_.getAnalyticExprs().get(i), - analyticInfo_.getOutputTupleDesc().getSlots().get(i), - analyticInfo_.getIntermediateTupleDesc().getSlots().get(i)); - match = true; - break; - } - } - if (!match) { - groups.add(new WindowGroup( - (AnalyticExpr) analyticInfo_.getAnalyticExprs().get(i), - analyticInfo_.getOutputTupleDesc().getSlots().get(i), - analyticInfo_.getIntermediateTupleDesc().getSlots().get(i))); - } - } - return groups; - } - - /** - * Collection of WindowGroups that share the same partition-by/order-by - * specification. - */ - private static class SortGroup { - public List<Expr> partitionByExprs; - public List<OrderByElement> orderByElements; - public List<WindowGroup> windowGroups = Lists.newArrayList(); - - // sum of windowGroups.physicalOutputTuple.getByteSize() - public int totalOutputTupleSize = -1; - - public SortGroup(WindowGroup windowGroup) { - partitionByExprs = windowGroup.partitionByExprs; - orderByElements = windowGroup.orderByElements; - windowGroups.add(windowGroup); - } - - /** - * True if the partition and ordering exprs of windowGroup match ours. - */ - public boolean isCompatible(WindowGroup windowGroup) { - return Expr.equalSets(windowGroup.partitionByExprs, partitionByExprs) - && windowGroup.orderByElements.equals(orderByElements); - } - - public void add(WindowGroup windowGroup) { - Preconditions.checkState(isCompatible(windowGroup)); - windowGroups.add(windowGroup); - } - - /** - * Return true if 'this' and other have compatible partition exprs and - * our orderByElements are a prefix of other's. - */ - public boolean isPrefixOf(SortGroup other) { - if (other.orderByElements.size() > orderByElements.size()) return false; - if (!Expr.equalSets(partitionByExprs, other.partitionByExprs)) return false; - for (int i = 0; i < other.orderByElements.size(); ++i) { - OrderByElement ob = orderByElements.get(i); - OrderByElement otherOb = other.orderByElements.get(i); - // TODO: compare equiv classes by comparing each equiv class's placeholder - // slotref - if (!ob.getExpr().equals(otherOb.getExpr())) return false; - if (ob.isAsc() != otherOb.isAsc()) return false; - if (ob.nullsFirst() != otherOb.nullsFirst()) return false; - } - return true; - } - - /** - * Adds other's window groups to ours, assuming that we're a prefix of other. - */ - public void absorb(SortGroup other) { - Preconditions.checkState(isPrefixOf(other)); - windowGroups.addAll(other.windowGroups); - } - - /** - * Compute totalOutputTupleSize. - */ - public void init() { - totalOutputTupleSize = 0; - for (WindowGroup g: windowGroups) { - TupleDescriptor outputTuple = g.physicalOutputTuple; - Preconditions.checkState(outputTuple.isMaterialized()); - Preconditions.checkState(outputTuple.getByteSize() != -1); - totalOutputTupleSize += outputTuple.getByteSize(); - } - } - - private static class SizeLt implements Comparator<WindowGroup> { - public int compare(WindowGroup wg1, WindowGroup wg2) { - Preconditions.checkState(wg1.physicalOutputTuple != null - && wg1.physicalOutputTuple.getByteSize() != -1); - Preconditions.checkState(wg2.physicalOutputTuple != null - && wg2.physicalOutputTuple.getByteSize() != -1); - int diff = wg1.physicalOutputTuple.getByteSize() - - wg2.physicalOutputTuple.getByteSize(); - return (diff < 0 ? -1 : (diff > 0 ? 1 : 0)); - } - } - - private static final SizeLt SIZE_LT; - static { - SIZE_LT = new SizeLt(); - } - - /** - * Order window groups by increasing size of the output tuple. This minimizes - * the total volume of data that needs to be buffered. - */ - public void orderWindowGroups() { - Collections.sort(windowGroups, SIZE_LT); - } - } - - /** - * Partitions the windowGroups into SortGroups based on compatible order by exprs. - */ - private List<SortGroup> collectSortGroups(List<WindowGroup> windowGroups) { - List<SortGroup> sortGroups = Lists.newArrayList(); - for (WindowGroup windowGroup: windowGroups) { - boolean match = false; - for (SortGroup sortGroup: sortGroups) { - if (sortGroup.isCompatible(windowGroup)) { - sortGroup.add(windowGroup); - match = true; - break; - } - } - if (!match) sortGroups.add(new SortGroup(windowGroup)); - } - return sortGroups; - } - - /** - * Collection of SortGroups that have compatible partition-by specifications. - */ - private static class PartitionGroup { - public List<Expr> partitionByExprs; - public List<SortGroup> sortGroups = Lists.newArrayList(); - - // sum of sortGroups.windowGroups.physicalOutputTuple.getByteSize() - public int totalOutputTupleSize = -1; - - public PartitionGroup(SortGroup sortGroup) { - partitionByExprs = sortGroup.partitionByExprs; - sortGroups.add(sortGroup); - totalOutputTupleSize = sortGroup.totalOutputTupleSize; - } - - /** - * True if the partition exprs of sortGroup are compatible with ours. - * For now that means equality. - */ - public boolean isCompatible(SortGroup sortGroup) { - return Expr.equalSets(sortGroup.partitionByExprs, partitionByExprs); - } - - public void add(SortGroup sortGroup) { - Preconditions.checkState(isCompatible(sortGroup)); - sortGroups.add(sortGroup); - totalOutputTupleSize += sortGroup.totalOutputTupleSize; - } - - /** - * Merge 'other' into 'this' - * - partitionByExprs is the intersection of the two - * - sortGroups becomes the union - */ - public void merge(PartitionGroup other) { - partitionByExprs = Expr.intersect(partitionByExprs, other.partitionByExprs); - Preconditions.checkState(Expr.getNumDistinctValues(partitionByExprs) >= 0); - sortGroups.addAll(other.sortGroups); - } - - /** - * Order sort groups by increasing totalOutputTupleSize. This minimizes the total - * volume of data that needs to be sorted. - */ - public void orderSortGroups() { - Collections.sort(sortGroups, - new Comparator<SortGroup>() { - public int compare(SortGroup sg1, SortGroup sg2) { - Preconditions.checkState(sg1.totalOutputTupleSize > 0); - Preconditions.checkState(sg2.totalOutputTupleSize > 0); - int diff = sg1.totalOutputTupleSize - sg2.totalOutputTupleSize; - return (diff < 0 ? -1 : (diff > 0 ? 1 : 0)); - } - }); - for (SortGroup sortGroup: sortGroups) { - sortGroup.orderWindowGroups(); - } - } - } - - /** - * Extract a minimal set of PartitionGroups from sortGroups. - */ - private List<PartitionGroup> collectPartitionGroups(List<SortGroup> sortGroups) { - List<PartitionGroup> partitionGroups = Lists.newArrayList(); - for (SortGroup sortGroup: sortGroups) { - boolean match = false; - for (PartitionGroup partitionGroup: partitionGroups) { - if (partitionGroup.isCompatible(sortGroup)) { - partitionGroup.add(sortGroup); - match = true; - break; - } - } - if (!match) partitionGroups.add(new PartitionGroup(sortGroup)); - } - return partitionGroups; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/CohortId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/CohortId.java b/fe/src/main/java/com/cloudera/impala/planner/CohortId.java deleted file mode 100644 index d58e5c4..0000000 --- a/fe/src/main/java/com/cloudera/impala/planner/CohortId.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.planner; - -import com.cloudera.impala.common.Id; -import com.cloudera.impala.common.IdGenerator; - -public class CohortId extends Id<CohortId> { - // Construction only allowed via an IdGenerator. - protected CohortId(int id) { - super(id); - } - - public static IdGenerator<CohortId> createGenerator() { - return new IdGenerator<CohortId>() { - @Override - public CohortId getNextId() { return new CohortId(nextId_++); } - @Override - public CohortId getMaxId() { return new CohortId(nextId_ - 1); } - }; - } - - @Override - public String toString() { - return String.format("%02d", id_); - } -}
