This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 723b764bc9 Allow extra aggregation types in 
RealtimeToOfflineSegmentsTask (#10982)
723b764bc9 is described below

commit 723b764bc91275c0b8361d3f9135f151b6404c39
Author: Andi Miller <[email protected]>
AuthorDate: Thu Jul 20 23:05:30 2023 +0100

    Allow extra aggregation types in RealtimeToOfflineSegmentsTask (#10982)
---
 .../aggregator/ValueAggregatorFactory.java         |  3 +++
 .../segment/local/utils/TableConfigUtils.java      | 24 ++++++++++++++++++----
 .../segment/local/utils/TableConfigUtilsTest.java  | 21 +++++++++++++++++++
 3 files changed, 44 insertions(+), 4 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index 4cd5a1ea6d..cd5388870d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -32,6 +32,9 @@ public class ValueAggregatorFactory {
 
   /**
    * Constructs a ValueAggregator from the given aggregation type.
+   *
+   * When adding entries to this please add them to the Set in 
org.apache.pinot.segment.local.utils.TableConfigUtils
+   * named AVAILABLE_CORE_VALUE_AGGREGATORS so that they can be used in 
RealtimeToOfflineTask
    */
   public static ValueAggregator getValueAggregator(AggregationFunctionType 
aggregationType, DataType dataType) {
     switch (aggregationType) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 89cac72cac..76cba334e5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -83,6 +83,8 @@ import org.quartz.CronScheduleBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.segment.spi.AggregationFunctionType.*;
+
 
 /**
  * Utils related to table config operations
@@ -103,8 +105,7 @@ public final class TableConfigUtils {
   // hardcode the value here to avoid pulling the entire pinot-kinesis module 
as dependency.
   private static final String KINESIS_STREAM_TYPE = "kinesis";
   private static final EnumSet<AggregationFunctionType> 
SUPPORTED_INGESTION_AGGREGATIONS =
-      EnumSet.of(AggregationFunctionType.SUM, AggregationFunctionType.MIN, 
AggregationFunctionType.MAX,
-          AggregationFunctionType.COUNT);
+      EnumSet.of(SUM, MIN, MAX, COUNT);
   private static final Set<String> UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES =
       
ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
           RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE);
@@ -483,6 +484,11 @@ public final class TableConfigUtils {
     }
   }
 
+  public final static EnumSet<AggregationFunctionType> 
AVAILABLE_CORE_VALUE_AGGREGATORS =
+      EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, 
DISTINCTCOUNTTHETASKETCH,
+          DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, 
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
+          SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH);
+
   @VisibleForTesting
   static void validateTaskConfigs(TableConfig tableConfig, Schema schema) {
     TableTaskConfig taskConfig = tableConfig.getTaskConfig();
@@ -523,8 +529,18 @@ public final class TableConfigUtils {
             if (entry.getKey().endsWith(".aggregationType")) {
               
Preconditions.checkState(columnNames.contains(StringUtils.removeEnd(entry.getKey(),
 ".aggregationType")),
                   String.format("Column \"%s\" not found in schema!", 
entry.getKey()));
-              Preconditions.checkState(ImmutableSet.of("SUM", "MAX", 
"MIN").contains(entry.getValue().toUpperCase()),
-                  String.format("Column \"%s\" has invalid aggregate type: 
%s", entry.getKey(), entry.getValue()));
+              try {
+                // check that it's a valid aggregation function type
+                AggregationFunctionType aft = 
AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+                // check that a value aggregator is available
+                if (!AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft)) {
+                  throw new IllegalArgumentException("ValueAggregator not 
enabled for type: " + aft.toString());
+                }
+              } catch (IllegalArgumentException e) {
+                String err = String.format(
+                    "Column \"%s\" has invalid aggregate type: %s", 
entry.getKey(), entry.getValue());
+                throw new IllegalStateException(err);
+              }
             }
           }
         }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index a848fd1247..89f380d049 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1755,6 +1755,27 @@ public class TableConfigUtilsTest {
     } catch (IllegalStateException e) {
       Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
     }
+
+    // aggregation function that exists but has no ValueAggregator available
+    HashMap<String, String> invalidAgg2Config = new 
HashMap<>(realtimeToOfflineTaskConfig);
+    invalidAgg2Config.put("myCol.aggregationType", "Histogram");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
+        ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAgg2Config, 
"SegmentGenerationAndPushTask",
+            segmentGenerationAndPushTaskConfig))).build();
+    try {
+      TableConfigUtils.validateTaskConfigs(tableConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
+    }
+
+    // valid agg
+    HashMap<String, String> validAggConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
+    validAggConfig.put("myCol.aggregationType", "distinctCountHLL");
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTaskConfig(new
 TableTaskConfig(
+        ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig, 
"SegmentGenerationAndPushTask",
+            segmentGenerationAndPushTaskConfig))).build();
+    TableConfigUtils.validateTaskConfigs(tableConfig, schema);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to