jcamachor commented on a change in pull request #960: HIVE-23030 ds rollup union
URL: https://github.com/apache/hive/pull/960#discussion_r399341201
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
##########
@@ -53,169 +73,278 @@
private static final String SKETCH_TO_VARIANCES = "variances";
private static final String SKETCH_TO_PERCENTILE = "percentile";
- private final Registry system;
+ private final List<SketchDescriptor> sketchClasses;
+ private final ArrayList<UDFDescriptor> descriptors;
+
+ DataSketchesFunctions() {
+ this.sketchClasses = new ArrayList<SketchDescriptor>();
+ this.descriptors = new ArrayList<HiveUDFPlugin.UDFDescriptor>();
+ registerHll();
+ registerCpc();
+ registerKll();
+ registerTheta();
+ registerTuple();
+ registerQuantiles();
+ registerFrequencies();
+
+ buildCalciteFns();
+ buildDescritors();
+ }
+
+ @Override
+ public Iterable<UDFDescriptor> getDescriptors() {
+ return descriptors;
+ }
- public DataSketchesFunctions(Registry system) {
- this.system = system;
+ private void buildDescritors() {
+ for (SketchDescriptor sketchDescriptor : sketchClasses) {
+ descriptors.addAll(sketchDescriptor.fnMap.values());
+ }
}
- public static void register(Registry system) {
- DataSketchesFunctions dsf = new DataSketchesFunctions(system);
- String prefix = "ds";
- dsf.registerHll(prefix);
- dsf.registerCpc(prefix);
- dsf.registerKll(prefix);
- dsf.registerTheta(prefix);
- dsf.registerTuple(prefix);
- dsf.registerQuantiles(prefix);
- dsf.registerFrequencies(prefix);
+ private void buildCalciteFns() {
+ for (SketchDescriptor sd : sketchClasses) {
+ // Mergability is exposed to Calcite; which enables to use it during
rollup.
+
+ RelProtoDataType sketchType = RelDataTypeImpl.proto(SqlTypeName.BINARY,
true);
+
+ SketchFunctionDescriptor sketchSFD = sd.fnMap.get(DATA_TO_SKETCH);
+ SketchFunctionDescriptor unionSFD = sd.fnMap.get(UNION_SKETCH);
+
+ if (sketchSFD == null || unionSFD == null) {
+ continue;
+ }
+
+ HiveMergeablAggregate unionFn = new HiveMergeablAggregate(unionSFD.name,
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.explicit(sketchType),
+ InferTypes.ANY_NULLABLE,
+ OperandTypes.family(),
+ null);
+
+ HiveMergeablAggregate sketchFn = new
HiveMergeablAggregate(sketchSFD.name,
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.explicit(sketchType),
+ InferTypes.ANY_NULLABLE,
+ OperandTypes.family(),
+ unionFn);
+
+ unionSFD.setCalciteFunction(unionFn);
+ sketchSFD.setCalciteFunction(sketchFn);
+ }
}
- private void registerHll(String prefix) {
- String p = prefix + "_hll_";
- registerUDAF(org.apache.datasketches.hive.hll.DataToSketchUDAF.class, p +
DATA_TO_SKETCH);
-
registerUDF(org.apache.datasketches.hive.hll.SketchToEstimateAndErrorBoundsUDF.class,
- p + SKETCH_TO_ESTIMATE_WITH_ERROR_BOUNDS);
- registerUDF(org.apache.datasketches.hive.hll.SketchToEstimateUDF.class, p
+ SKETCH_TO_ESTIMATE);
- registerUDF(org.apache.datasketches.hive.hll.SketchToStringUDF.class, p +
SKETCH_TO_STRING);
- registerUDF(org.apache.datasketches.hive.hll.UnionSketchUDF.class, p +
UNION_SKETCH1);
- registerUDAF(org.apache.datasketches.hive.hll.UnionSketchUDAF.class, p +
UNION_SKETCH);
+
+ private void registerHiveFunctionsInternal(Registry system) {
+ for (SketchDescriptor sketchDescriptor : sketchClasses) {
+ Collection<SketchFunctionDescriptor> functions =
sketchDescriptor.fnMap.values();
+ for (SketchFunctionDescriptor fn : functions) {
+ if (UDF.class.isAssignableFrom(fn.udfClass)) {
+ system.registerUDF(fn.name, (Class<? extends UDF>) fn.udfClass,
false);
+ continue;
+ }
+ if (GenericUDAFResolver2.class.isAssignableFrom(fn.udfClass)) {
+ String name = fn.name;
+ try {
+ system.registerGenericUDAF(name, ((Class<? extends
GenericUDAFResolver2>) fn.udfClass).newInstance());
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException("Unable to register: " + name, e);
+ }
+ continue;
+ }
+ if (GenericUDTF.class.isAssignableFrom(fn.udfClass)) {
+ system.registerGenericUDTF(fn.name, (Class<? extends GenericUDTF>)
fn.udfClass);
+ continue;
+ }
+ throw new RuntimeException("Don't know how to register: " + fn.name);
+ }
+ }
+
}
- private void registerCpc(String prefix) {
- String p = prefix + "_cpc_";
- registerUDAF(org.apache.datasketches.hive.cpc.DataToSketchUDAF.class, p +
DATA_TO_SKETCH);
+ private static class SketchFunctionDescriptor implements
HiveUDFPlugin.UDFDescriptor {
+ String name;
+ Class<?> udfClass;
+ private SqlFunction calciteFunction;
+
+ public SketchFunctionDescriptor(String name, Class<?> udfClass) {
+ this.name = name;
+ this.udfClass = udfClass;
+ }
+
+ @Override
+ public Class<?> getUDFClass() {
+ return udfClass;
+ }
+
+ @Override
+ public String getFunctionName() {
+ return name;
+ }
+
+ @Override
+ public Optional<SqlFunction> getCalciteFunction() {
+ return Optional.ofNullable(calciteFunction);
+ }
+
+ public void setCalciteFunction(SqlFunction calciteFunction) {
+ this.calciteFunction = calciteFunction;
+ }
+ }
+
+ static class SketchDescriptor {
+ Map<String, SketchFunctionDescriptor> fnMap;
+ private String functionPrefix;
+
+ public SketchDescriptor(String string) {
+ fnMap = new HashMap<String, SketchFunctionDescriptor>();
+ functionPrefix = DATASKETCHES_PREFIX + "_" + string + "_";
+ }
+
+ private void register(String name, Class<?> clazz) {
+ fnMap.put(name, new SketchFunctionDescriptor(functionPrefix + name,
clazz));
+ }
+ }
+
+ private void registerHll() {
+ String p = "asd";
Review comment:
Never used?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]