shenyu0127 commented on code in PR #10594:
URL: https://github.com/apache/pinot/pull/10594#discussion_r1175905387


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java:
##########
@@ -33,48 +35,31 @@
 /**
  * The <code>CoalesceTransformFunction</code> implements the Coalesce operator.
  *
- * The results are in String format for first non-null value in the argument 
list.
- * If all arguments are null, return a 'null' string.
- * Note: arguments have to be column names and single type. The type can be 
either numeric or string.
+ * The results are first non-null value in the argument list.

Review Comment:
   nit: s/The results are first non-null value in the argument list./The result 
is the first non-null value in the argument list/



##########
pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunctionTest.java:
##########


Review Comment:
   Add a unit test for "COALESCE(NULL, NULL)".



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java:
##########
@@ -33,48 +35,31 @@
 /**
  * The <code>CoalesceTransformFunction</code> implements the Coalesce operator.
  *
- * The results are in String format for first non-null value in the argument 
list.
- * If all arguments are null, return a 'null' string.
- * Note: arguments have to be column names and single type. The type can be 
either numeric or string.
+ * The results are first non-null value in the argument list.
+ * If all arguments are null, return null.
+ *
+ * Note: arguments have to be compatible type.
  * Number of arguments has to be greater than 0.
  *
  * Expected result:
  * Coalesce(nullColumn, columnA): columnA
  * Coalesce(columnA, nullColumn): nullColumn

Review Comment:
   s/Coalesce(columnA, nullColumn): nullColumn/Coalesce(columnA, nullColumn): 
columnA



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IsNotNullTransformFunction.java:
##########
@@ -26,12 +26,12 @@
 import org.apache.pinot.core.operator.ColumnContext;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
-import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.IntConsumer;
+import org.roaringbitmap.RoaringBitmap;
 
 
 public class IsNotNullTransformFunction extends BaseTransformFunction {

Review Comment:
   Make IsNotNullTransformFunction a subclass of IsNullTransformFunction?



##########
pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunctionTest.java:
##########
@@ -18,456 +18,78 @@
  */
 package org.apache.pinot.core.operator.transform.function;
 
-import java.io.File;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
-import org.apache.pinot.core.operator.DocIdSetOperator;
-import org.apache.pinot.core.operator.ProjectionOperator;
-import org.apache.pinot.core.operator.blocks.ProjectionBlock;
-import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
-import org.apache.pinot.core.plan.DocIdSetPlanNode;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.roaringbitmap.RoaringBitmap;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
 public class CoalesceTransformFunctionTest extends BaseTransformFunctionTest {
-  private static final String ENABLE_NULL_SEGMENT_NAME = "testSegment1";
-  private static final String DISABLE_NULL_SEGMENT_NAME = "testSegment2";
-  private static final Random RANDOM = new Random();
-
-  private static final int NUM_ROWS = 1000;
-  private static final String INT_SV_COLUMN1 = "intSV1";
-  private static final String INT_SV_COLUMN2 = "intSV2";
-  private static final String STRING_SV_COLUMN1 = "StringSV1";
-  private static final String STRING_SV_COLUMN2 = "StringSV2";
-  private static final String BIG_DECIMAL_SV_COLUMN1 = "BigDecimalSV1";
-  private static final String BIG_DECIMAL_SV_COLUMN2 = "BigDecimalSV2";
-  private static final String LONG_SV_COLUMN1 = "LongSV1";
-  private static final String LONG_SV_COLUMN2 = "LongSV2";
-  private static final String DOUBLE_SV_COLUMN1 = "DoubleSV1";
-  private static final String DOUBLE_SV_COLUMN2 = "DoubleSV2";
-
-  private static final String FLOAT_SV_COLUMN1 = "FloatSV1";
-  private static final String FLOAT_SV_COLUMN2 = "FLoatSV2";
-  private final int[] _intSVValues = new int[NUM_ROWS];
-  private final double[] _doubleValues = new double[NUM_ROWS];
-  private final float[] _floatValues = new float[NUM_ROWS];
-  private final String[] _stringSVValues = new String[NUM_ROWS];
-  private Map<String, DataSource> _enableNullDataSourceMap;
-  private Map<String, DataSource> _disableNullDataSourceMap;
-  private ProjectionBlock _enableNullProjectionBlock;
-  private ProjectionBlock _disableNullProjectionBlock;
-  // Mod decides whether the first column of the same type should be null.
-  private static final int NULL_MOD1 = 3;
-  // Mod decides whether the second column of the same type should be null.
-  private static final int NULL_MOD2 = 5;
-  // Difference between two same type numeric columns.
-  private static final int INT_VALUE_SHIFT = 2;
-  private static final double DOUBLE_VALUE_SHIFT = 0.1;
-  private static final float FLOAT_VALUE_SHIFT = 0.1f;
-
-  // Suffix for second string column.
-  private static final String SUFFIX = "column2";
-
-  private static String getIndexDirPath(String segmentName) {
-    return FileUtils.getTempDirectoryPath() + File.separator + segmentName;
-  }
-
-  private static Map<String, DataSource> getDataSourceMap(Schema schema, 
List<GenericRow> rows, String segmentName)
-      throws Exception {
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(segmentName).setNullHandlingEnabled(true).build();
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
-    config.setOutDir(getIndexDirPath(segmentName));
-    config.setSegmentName(segmentName);
-    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(config, new GenericRowRecordReader(rows));
-    driver.build();
-    IndexSegment indexSegment =
-        ImmutableSegmentLoader.load(new File(getIndexDirPath(segmentName), 
segmentName), ReadMode.heap);
-    Set<String> columnNames = indexSegment.getPhysicalColumnNames();
-    Map<String, DataSource> enableNullDataSourceMap = new 
HashMap<>(columnNames.size());
-    for (String columnName : columnNames) {
-      enableNullDataSourceMap.put(columnName, 
indexSegment.getDataSource(columnName));
-    }
-    return enableNullDataSourceMap;
-  }
-
-  private static ProjectionBlock getProjectionBlock(Map<String, DataSource> 
dataSourceMap) {
-    return new ProjectionOperator(dataSourceMap,
-        new DocIdSetOperator(new MatchAllFilterOperator(NUM_ROWS), 
DocIdSetPlanNode.MAX_DOC_PER_CALL)).nextBlock();
-  }
-
-  private static boolean isColumn1Null(int i) {
-    return i % NULL_MOD1 == 0;
-  }
-
-  private static boolean isColumn2Null(int i) {
-    return i % NULL_MOD2 == 0;
-  }
-
-  @BeforeClass
-  public void setup()
-      throws Exception {
-    // Set up two tables: one with null option enable, the other with null 
option disable.
-    // Each table one string column, and one int column with some rows set to 
null.
-    FileUtils.deleteQuietly(new 
File(getIndexDirPath(DISABLE_NULL_SEGMENT_NAME)));
-    FileUtils.deleteQuietly(new 
File(getIndexDirPath(ENABLE_NULL_SEGMENT_NAME)));
-    for (int i = 0; i < NUM_ROWS; i++) {
-      _intSVValues[i] = RANDOM.nextInt();
-      _doubleValues[i] = RANDOM.nextDouble();
-      _floatValues[i] = RANDOM.nextFloat();
-      _stringSVValues[i] = "a" + RANDOM.nextInt();
-    }
-    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Map<String, Object> map = new HashMap<>();
-      map.put(INT_SV_COLUMN1, _intSVValues[i]);
-      map.put(INT_SV_COLUMN2, _intSVValues[i] + INT_VALUE_SHIFT);
-      map.put(DOUBLE_SV_COLUMN1, _doubleValues[i]);
-      map.put(DOUBLE_SV_COLUMN2, _doubleValues[i] + DOUBLE_VALUE_SHIFT);
-      map.put(FLOAT_SV_COLUMN1, _floatValues[i]);
-      map.put(FLOAT_SV_COLUMN2, _floatValues[i] + FLOAT_VALUE_SHIFT);
-      map.put(STRING_SV_COLUMN1, _stringSVValues[i]);
-      map.put(STRING_SV_COLUMN2, _stringSVValues[i] + SUFFIX);
-      map.put(BIG_DECIMAL_SV_COLUMN1, BigDecimal.valueOf(_intSVValues[i]));
-      map.put(BIG_DECIMAL_SV_COLUMN2, BigDecimal.valueOf(_intSVValues[i] + 
INT_VALUE_SHIFT));
-      map.put(LONG_SV_COLUMN1, _intSVValues[i]);
-      map.put(LONG_SV_COLUMN2, _intSVValues[i] + INT_VALUE_SHIFT);
-
-      if (isColumn1Null(i)) {
-        map.put(INT_SV_COLUMN1, null);
-        map.put(STRING_SV_COLUMN1, null);
-        map.put(BIG_DECIMAL_SV_COLUMN1, null);
-        map.put(LONG_SV_COLUMN1, null);
-        map.put(DOUBLE_SV_COLUMN1, null);
-        map.put(FLOAT_SV_COLUMN1, null);
-      }
-      if (isColumn2Null(i)) {
-        map.put(INT_SV_COLUMN2, null);
-        map.put(STRING_SV_COLUMN2, null);
-        map.put(LONG_SV_COLUMN2, null);
-        map.put(BIG_DECIMAL_SV_COLUMN2, null);
-        map.put(DOUBLE_SV_COLUMN2, null);
-        map.put(FLOAT_SV_COLUMN2, null);
-      }
-      GenericRow row = new GenericRow();
-      row.init(map);
-      rows.add(row);
-    }
-    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(INT_SV_COLUMN1, 
FieldSpec.DataType.INT)
-        .addSingleValueDimension(INT_SV_COLUMN2, FieldSpec.DataType.INT)
-        .addSingleValueDimension(STRING_SV_COLUMN1, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(STRING_SV_COLUMN2, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(LONG_SV_COLUMN1, FieldSpec.DataType.LONG)
-        .addSingleValueDimension(LONG_SV_COLUMN2, FieldSpec.DataType.LONG)
-        .addSingleValueDimension(DOUBLE_SV_COLUMN1, FieldSpec.DataType.DOUBLE)
-        .addSingleValueDimension(DOUBLE_SV_COLUMN2, FieldSpec.DataType.DOUBLE)
-        .addSingleValueDimension(FLOAT_SV_COLUMN1, FieldSpec.DataType.FLOAT)
-        .addSingleValueDimension(FLOAT_SV_COLUMN2, FieldSpec.DataType.FLOAT)
-        .addMetric(BIG_DECIMAL_SV_COLUMN1, FieldSpec.DataType.BIG_DECIMAL)
-        .addMetric(BIG_DECIMAL_SV_COLUMN2, 
FieldSpec.DataType.BIG_DECIMAL).build();
-    _enableNullDataSourceMap = getDataSourceMap(schema, rows, 
ENABLE_NULL_SEGMENT_NAME);
-    _enableNullProjectionBlock = getProjectionBlock(_enableNullDataSourceMap);
-    _disableNullDataSourceMap = getDataSourceMap(schema, rows, 
DISABLE_NULL_SEGMENT_NAME);
-    _disableNullProjectionBlock = 
getProjectionBlock(_disableNullDataSourceMap);
-  }
-
-  private static void testIntTransformFunction(ExpressionContext expression, 
int[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    int[] actualValues =
-        TransformFunctionFactory.get(expression, 
dataSourceMap).transformToIntValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testStringTransformFunction(ExpressionContext 
expression, String[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    String[] actualValues =
-        TransformFunctionFactory.get(expression, 
dataSourceMap).transformToStringValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testLongTransformFunction(ExpressionContext expression, 
long[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    long[] actualValues =
-        TransformFunctionFactory.get(expression, 
dataSourceMap).transformToLongValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testDoubleTransformFunction(ExpressionContext 
expression, double[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    double[] actualValues =
-        TransformFunctionFactory.get(expression, 
dataSourceMap).transformToDoubleValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testFloatTransformFunction(ExpressionContext expression, 
float[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    float[] actualValues =
-        TransformFunctionFactory.get(expression, 
dataSourceMap).transformToFloatValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testBigDecimalTransformFunction(ExpressionContext 
expression, BigDecimal[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    BigDecimal[] actualValues =
-        TransformFunctionFactory.get(expression, 
dataSourceMap).transformToBigDecimalValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  // Test the Coalesce on two Int columns where one or the other or both can 
be null.
   @Test
-  public void testCoalesceIntColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", 
INT_SV_COLUMN1, INT_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = 
TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
-    int[] expectedResults = new int[NUM_ROWS];
-    for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_INT;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _intSVValues[i] + INT_VALUE_SHIFT;
-      } else if (isColumn2Null(i)) {
-        expectedResults[i] = _intSVValues[i];
-      } else {
-        expectedResults[i] = _intSVValues[i];
-      }
-    }
-    testIntTransformFunction(coalesceExpr, expectedResults, 
_enableNullProjectionBlock, _enableNullDataSourceMap);
-    testIntTransformFunction(coalesceExpr, expectedResults, 
_disableNullProjectionBlock, _disableNullDataSourceMap);
-  }
+  public void testCoalesceIntColumns() {
+    TransformFunction coalesceFunc = TransformFunctionFactory.get(
+        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", 
INT_SV_NULL_COLUMN, LONG_SV_COLUMN)),
+        _dataSourceMap);
 
-  // Test the Coalesce on two long columns where one or the other or both can 
be null.
-  @Test
-  public void testCoalesceLongColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", 
LONG_SV_COLUMN1, LONG_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = 
TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
     long[] expectedResults = new long[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_LONG;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _intSVValues[i] + INT_VALUE_SHIFT;
-      } else if (isColumn2Null(i)) {
-        expectedResults[i] = _intSVValues[i];
-      } else {
+      if (i % 2 == 0) {
         expectedResults[i] = _intSVValues[i];
-      }
-    }
-    testLongTransformFunction(coalesceExpr, expectedResults, 
_enableNullProjectionBlock, _enableNullDataSourceMap);
-    testLongTransformFunction(coalesceExpr, expectedResults, 
_disableNullProjectionBlock, _disableNullDataSourceMap);
-  }
-
-  // Test the Coalesce on two float columns where one or the other or both can 
be null.
-  @Test
-  public void testCoalesceFloatColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", 
FLOAT_SV_COLUMN1, FLOAT_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = 
TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
-    float[] expectedResults = new float[NUM_ROWS];
-    for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_FLOAT;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _floatValues[i] + FLOAT_VALUE_SHIFT;
-      } else if (isColumn2Null(i)) {
-        expectedResults[i] = _floatValues[i];
-      } else {
-        expectedResults[i] = _floatValues[i];
-      }
-    }
-    testFloatTransformFunction(coalesceExpr, expectedResults, 
_enableNullProjectionBlock, _enableNullDataSourceMap);
-    testFloatTransformFunction(coalesceExpr, expectedResults, 
_disableNullProjectionBlock, _disableNullDataSourceMap);
-  }
-
-  // Test the Coalesce on two double columns where one or the other or both 
can be null.
-  @Test
-  public void testCoalesceDoubleColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", 
DOUBLE_SV_COLUMN1, DOUBLE_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = 
TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
-    double[] expectedResults = new double[NUM_ROWS];
-    for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_DOUBLE;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _doubleValues[i] + DOUBLE_VALUE_SHIFT;
-      } else if (isColumn2Null(i)) {
-        expectedResults[i] = _doubleValues[i];
       } else {
-        expectedResults[i] = _doubleValues[i];
+        expectedResults[i] = _longSVValues[i];
       }
     }
-    testDoubleTransformFunction(coalesceExpr, expectedResults, 
_enableNullProjectionBlock, _enableNullDataSourceMap);
-    testDoubleTransformFunction(coalesceExpr, expectedResults, 
_disableNullProjectionBlock, _disableNullDataSourceMap);
+    testTransformFunction(coalesceFunc, expectedResults);
   }
 
-  // Test the Coalesce on two big decimal columns where one or the other or 
both can be null.
   @Test
-  public void testCoalesceBigDecimalColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr = RequestContextUtils.getExpression(
-        String.format("COALESCE(%s,%s)", BIG_DECIMAL_SV_COLUMN1, 
BIG_DECIMAL_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = 
TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
-    BigDecimal[] expectedResults = new BigDecimal[NUM_ROWS];
+  public void testCoalesceIntColumnsAndLiterals() {
+    TransformFunction coalesceFunc = TransformFunctionFactory.get(
+        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", 
INT_SV_NULL_COLUMN, 314)),

Review Comment:
   nit: avoid magic number by defining a variable.



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java:
##########
@@ -121,20 +132,23 @@ private void 
constructStatementListLegacy(List<TransformFunction> arguments) {
     for (int i = 0; i < numWhenStatements; i++) {
       _whenStatements.add(arguments.get(i));
     }
-    for (int i = numWhenStatements; i < numWhenStatements * 2 + 1; i++) {
-      _elseThenStatements.add(arguments.get(i));
+    for (int i = numWhenStatements; i < numWhenStatements * 2; i++) {
+      _thenStatements.add(arguments.get(i));
+    }
+    if (arguments.size() % 2 != 0) {
+      _elseStatement = arguments.get(arguments.size() - 1);
     }
   }
 
   private TransformResultMetadata calculateResultMetadata() {
-    TransformFunction elseStatement = _elseThenStatements.get(0);
+    TransformFunction elseStatement = _elseStatement;

Review Comment:
   We can simply use `_elseStatement`.



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java:
##########
@@ -173,6 +190,7 @@ private TransformResultMetadata calculateResultMetadata() {
         case LONG:
           switch (thenStatementDataType) {
             case INT:
+            case UNKNOWN:

Review Comment:
   Put `case UNKONWN` above `default` for consistency?



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java:
##########
@@ -241,226 +266,768 @@ public TransformResultMetadata getResultMetadata() {
   }
 
   /**
-   * Evaluate the ValueBlock for the WHEN statements, returns an array with the
-   * index(1 to N) of matched WHEN clause ordered by match priority, 0 means 
nothing
-   * matched, so go to ELSE.
+   * Evaluate the ValueBlock for the WHEN statements, returns an array with 
the index(1 to N) of matched WHEN clause
+   * -1 means there is no match.
    */
-  private int[] getSelectedArray(ValueBlock valueBlock) {
+  private int[] getSelectedArray(ValueBlock valueBlock, boolean 
nullHandlingEnabled) {
     int numDocs = valueBlock.getNumDocs();
     if (_selectedResults == null || _selectedResults.length < numDocs) {
       _selectedResults = new int[numDocs];
-    } else {
-      Arrays.fill(_selectedResults, 0, numDocs, 0);
-      Arrays.fill(_selections, false);
     }
+    Arrays.fill(_selectedResults, 0, numDocs, -1);

Review Comment:
   Arrays.fill(_selectedResults, -1);



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java:
##########
@@ -241,226 +266,768 @@ public TransformResultMetadata getResultMetadata() {
   }
 
   /**
-   * Evaluate the ValueBlock for the WHEN statements, returns an array with the
-   * index(1 to N) of matched WHEN clause ordered by match priority, 0 means 
nothing
-   * matched, so go to ELSE.
+   * Evaluate the ValueBlock for the WHEN statements, returns an array with 
the index(1 to N) of matched WHEN clause
+   * -1 means there is no match.
    */
-  private int[] getSelectedArray(ValueBlock valueBlock) {
+  private int[] getSelectedArray(ValueBlock valueBlock, boolean 
nullHandlingEnabled) {
     int numDocs = valueBlock.getNumDocs();
     if (_selectedResults == null || _selectedResults.length < numDocs) {
       _selectedResults = new int[numDocs];
-    } else {
-      Arrays.fill(_selectedResults, 0, numDocs, 0);
-      Arrays.fill(_selections, false);
     }
+    Arrays.fill(_selectedResults, 0, numDocs, -1);
+    Arrays.fill(_selections, false);
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
     int numWhenStatements = _whenStatements.size();
-    for (int i = numWhenStatements - 1; i >= 0; i--) {
+    for (int i = 0; i < numWhenStatements; i++) {
       TransformFunction whenStatement = _whenStatements.get(i);
-      int[] conditions = whenStatement.transformToIntValuesSV(valueBlock);
-      for (int j = 0; j < numDocs & j < conditions.length; j++) {
-        _selectedResults[j] = Math.max(conditions[j] * (i + 1), 
_selectedResults[j]);
+      int[] conditions = getWhenConditions(whenStatement, valueBlock, 
nullHandlingEnabled);
+      for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = 
unselectedDocs.nextSetBit(docId + 1)) {
+        if (conditions[docId] == 1) {
+          unselectedDocs.clear(docId);
+          _selectedResults[docId] = i;
+        }
+      }
+      if (unselectedDocs.isEmpty()) {
+        break;
       }
     }
     // try to prune clauses now
     for (int i = 0; i < numDocs; i++) {
-      _selections[_selectedResults[i]] = true;
-    }
-    int numSelections = 0;
-    for (boolean selection : _selections) {
-      if (selection) {
-        numSelections++;
+      if (_selectedResults[i] != -1) {
+        _selections[_selectedResults[i]] = true;
       }
     }
-    _numSelections = numSelections;
     return _selectedResults;
   }
 
+  // Returns an array of valueBlock length to indicate whether a row is 
selected or not.
+  // When nullHandlingEnabled is set to true, we also check whether the row is 
null and set to false if null.
+  private static int[] getWhenConditions(TransformFunction whenStatement, 
ValueBlock valueBlock,
+      boolean nullHandlingEnabled) {
+    if (!nullHandlingEnabled) {
+      return whenStatement.transformToIntValuesSV(valueBlock);
+    }
+    Pair<int[], RoaringBitmap> result = 
whenStatement.transformToIntValuesSVWithNull(valueBlock);
+    RoaringBitmap bitmap = result.getRight();
+    int[] intResult = result.getLeft();
+    if (bitmap != null) {
+      bitmap.forEach((IntConsumer) (i) -> {
+        if (bitmap.contains(i)) {
+          intResult[i] = 0;
+        }
+      });
+    }
+    return intResult;
+  }
+
   @Override
   public int[] transformToIntValuesSV(ValueBlock valueBlock) {
     if (_resultMetadata.getDataType().getStoredType() != DataType.INT) {
       return super.transformToIntValuesSV(valueBlock);
     }
-    int[] selected = getSelectedArray(valueBlock);
+    int[] selected = getSelectedArray(valueBlock, false);
     int numDocs = valueBlock.getNumDocs();
     initIntValuesSV(numDocs);
-    int numElseThenStatements = _elseThenStatements.size();
-    for (int i = 0; i < numElseThenStatements; i++) {
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    for (int i = 0; i < numThenStatements; i++) {
       if (_selections[i]) {
-        TransformFunction transformFunction = _elseThenStatements.get(i);
+        TransformFunction transformFunction = _thenStatements.get(i);
         int[] intValues = transformFunction.transformToIntValuesSV(valueBlock);
-        if (_numSelections == 1) {
-          System.arraycopy(intValues, 0, _intValuesSV, 0, numDocs);
-        } else {
-          for (int j = 0; j < numDocs; j++) {
-            if (selected[j] == i) {
-              _intValuesSV[j] = intValues[j];
-            }
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = 
unselectedDocs.nextSetBit(docId + 1)) {

Review Comment:
   If we compute all the `_thenStatements` and store the values in a 
Map<Integer, int[]>, we can iterate over `unselectedDocs` only once.



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java:
##########
@@ -53,14 +58,19 @@
  *    <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> 
represents <code>result$i</code>
  *    <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> 
represents <code>result</code>
  *
+ * ELSE_EXPRESSION can be omitted.
+ * When none of when statements is evaluated to be true, and there is no else 
expression, we output null.
+ * Note that when statement is considered as false if it is evaluated to be 
null.
+ *
  */
 public class CaseTransformFunction extends BaseTransformFunction {
   public static final String FUNCTION_NAME = "case";
 
   private List<TransformFunction> _whenStatements = new ArrayList<>();
-  private List<TransformFunction> _elseThenStatements = new ArrayList<>();
+  private List<TransformFunction> _thenStatements = new ArrayList<>();
+  private TransformFunction _elseStatement;
+
   private boolean[] _selections;

Review Comment:
   nit: s/_selections/_computeThenStatements for better readability.



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java:
##########
@@ -116,24 +108,24 @@ private int[] getIntTransformResults(ValueBlock 
valueBlock) {
     int width = _transformFunctions.length;
     RoaringBitmap[] nullBitMaps = getNullBitMaps(valueBlock, 
_transformFunctions);
     int[][] data = new int[width][length];
-    RoaringBitmap filledData = new RoaringBitmap();
+    BitSet filledData = new BitSet(); // indicates whether certain column has 
be filled in data.
     for (int i = 0; i < length; i++) {
       boolean hasNonNullValue = false;
       for (int j = 0; j < width; j++) {
         // Consider value as null only when null option is enabled.
         if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) {
           continue;
         }
-        if (!filledData.contains(j)) {
-          filledData.add(j);
+        if (!filledData.get(j)) {
+          filledData.set(j);
           data[j] = _transformFunctions[j].transformToIntValuesSV(valueBlock);
         }

Review Comment:
   ```
   int[][] data = new int[width][];
   for (int i = 0; i < length; i++) {
     boolean hasNonNullValue = false;
     for (int j = 0; j < width; j++) {
       // Consider value as null only when null option is enabled.
       if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) {
         continue;
       }
       if (data[j] == null) {
         data[j] = _transformFunctions[j].transformToIntValuesSV(valueBlock);
       }
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to