Jackie-Jiang commented on a change in pull request #5461:
URL: https://github.com/apache/incubator-pinot/pull/5461#discussion_r436121149



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LiteralTransformFunction.java
##########
@@ -75,22 +82,40 @@ public Dictionary getDictionary() {
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    throw new UnsupportedOperationException();
+    if (_intResult == null) {
+      _intResult = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+      Arrays.fill(_intResult, Double.valueOf(_literal).intValue());

Review comment:
       Can we follow the same convention as in the `ArrayCopyUtils` so that the 
behavior is consistent everywhere? (Use Integer.parseInt, Long.parseLong, 
Float.parseFloat, Double.parseDouble, BytesUtils.toBytes)
   If the String is not of the target value format, the current behavior is to 
throw exception.
   For this PR, I don't think you need to change this class because the binary 
operator first check the data type and always fetch String value from Literal. 
Ideally it should check for Literal and call `getLiteral()`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
##########
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The <code>CaseTransformFunction</code> class implements the 
CASE-WHEN-THEN-ELSE transformation.
+ *
+ * The SQL Syntax is:
+ *    CASE
+ *        WHEN condition1 THEN result1
+ *        WHEN condition2 THEN result2
+ *        WHEN conditionN THEN resultN
+ *        ELSE result
+ *    END;
+ *
+ * Usage:
+ *    case(${WHEN_STATEMENT_1}, ..., ${WHEN_STATEMENT_N},
+ *         ${THEN_EXPRESSION_1}, ..., ${THEN_EXPRESSION_N},
+ *         ${ELSE_EXPRESSION})
+ *
+ * There are 2 * N + 1 arguments:
+ *    <code>WHEN_STATEMENT_$i</code> is a 
<code>BinaryOperatorTransformFunction</code> represents <code>condition$i</code>
+ *    <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> 
represents <code>result$i</code>
+ *    <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> 
represents <code>result</code>
+ *
+ */
+public class CaseTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "case";
+  private final List<TransformFunction> _whenStatements = new ArrayList<>();
+  private final List<TransformFunction> _elseThenStatements = new 
ArrayList<>();
+  private int _numberWhenStatements;
+  private TransformResultMetadata _resultMetadata;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
+      throw new IllegalArgumentException("At least 3 odd number of arguments 
are required for CASE-WHEN-ELSE function");
+    }
+    _numberWhenStatements = arguments.size() / 2;
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      _whenStatements.add(arguments.get(i));
+    }
+    // Add ELSE Statement first
+    _elseThenStatements.add(arguments.get(_numberWhenStatements * 2));
+    for (int i = _numberWhenStatements; i < _numberWhenStatements * 2; i++) {
+      _elseThenStatements.add(arguments.get(i));
+    }
+    Preconditions.checkState(_elseThenStatements.size() == 
_numberWhenStatements + 1, "Missing THEN/ELSE clause in CASE statement");

Review comment:
       Actually this is redundant check because you already check the number of 
arguments is odd

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
##########
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The <code>CaseTransformFunction</code> class implements the 
CASE-WHEN-THEN-ELSE transformation.
+ *
+ * The SQL Syntax is:
+ *    CASE
+ *        WHEN condition1 THEN result1
+ *        WHEN condition2 THEN result2
+ *        WHEN conditionN THEN resultN
+ *        ELSE result
+ *    END;
+ *
+ * Usage:
+ *    case(${WHEN_STATEMENT_1}, ..., ${WHEN_STATEMENT_N},
+ *         ${THEN_EXPRESSION_1}, ..., ${THEN_EXPRESSION_N},
+ *         ${ELSE_EXPRESSION})
+ *
+ * There are 2 * N + 1 arguments:
+ *    <code>WHEN_STATEMENT_$i</code> is a 
<code>BinaryOperatorTransformFunction</code> represents <code>condition$i</code>
+ *    <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> 
represents <code>result$i</code>
+ *    <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> 
represents <code>result</code>
+ *
+ */
+public class CaseTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "case";
+  private final List<TransformFunction> _whenStatements = new ArrayList<>();
+  private final List<TransformFunction> _elseThenStatements = new 
ArrayList<>();
+  private int _numberWhenStatements;
+  private TransformResultMetadata _resultMetadata;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
+      throw new IllegalArgumentException("At least 3 odd number of arguments 
are required for CASE-WHEN-ELSE function");
+    }
+    _numberWhenStatements = arguments.size() / 2;
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      _whenStatements.add(arguments.get(i));
+    }
+    // Add ELSE Statement first
+    _elseThenStatements.add(arguments.get(_numberWhenStatements * 2));
+    for (int i = _numberWhenStatements; i < _numberWhenStatements * 2; i++) {
+      _elseThenStatements.add(arguments.get(i));
+    }
+    Preconditions.checkState(_elseThenStatements.size() == 
_numberWhenStatements + 1, "Missing THEN/ELSE clause in CASE statement");
+    _resultMetadata = getResultMetadata(_elseThenStatements);
+  }
+
+  private TransformResultMetadata getResultMetadata(List<TransformFunction> 
elseThenStatements) {
+    FieldSpec.DataType dataType = 
elseThenStatements.get(0).getResultMetadata().getDataType();
+    for (int i = 1; i < elseThenStatements.size(); i++) {
+      TransformResultMetadata resultMetadata = 
elseThenStatements.get(i).getResultMetadata();
+      if (!resultMetadata.isSingleValue()) {

Review comment:
       We can support MV right? I think we can add limitation of not supporting 
mixed SV and MV? Or for now not allowing MV at all? Add some comments and add 
TODO maybe?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
##########
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The <code>CaseTransformFunction</code> class implements the 
CASE-WHEN-THEN-ELSE transformation.
+ *
+ * The SQL Syntax is:
+ *    CASE
+ *        WHEN condition1 THEN result1
+ *        WHEN condition2 THEN result2
+ *        WHEN conditionN THEN resultN
+ *        ELSE result
+ *    END;
+ *
+ * Usage:
+ *    case(${WHEN_STATEMENT_1}, ..., ${WHEN_STATEMENT_N},
+ *         ${THEN_EXPRESSION_1}, ..., ${THEN_EXPRESSION_N},
+ *         ${ELSE_EXPRESSION})
+ *
+ * There are 2 * N + 1 arguments:
+ *    <code>WHEN_STATEMENT_$i</code> is a 
<code>BinaryOperatorTransformFunction</code> represents <code>condition$i</code>
+ *    <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> 
represents <code>result$i</code>
+ *    <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> 
represents <code>result</code>
+ *
+ */
+public class CaseTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "case";
+  private final List<TransformFunction> _whenStatements = new ArrayList<>();
+  private final List<TransformFunction> _elseThenStatements = new 
ArrayList<>();
+  private int _numberWhenStatements;
+  private TransformResultMetadata _resultMetadata;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
+      throw new IllegalArgumentException("At least 3 odd number of arguments 
are required for CASE-WHEN-ELSE function");
+    }
+    _numberWhenStatements = arguments.size() / 2;
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      _whenStatements.add(arguments.get(i));
+    }
+    // Add ELSE Statement first
+    _elseThenStatements.add(arguments.get(_numberWhenStatements * 2));
+    for (int i = _numberWhenStatements; i < _numberWhenStatements * 2; i++) {
+      _elseThenStatements.add(arguments.get(i));
+    }
+    Preconditions.checkState(_elseThenStatements.size() == 
_numberWhenStatements + 1, "Missing THEN/ELSE clause in CASE statement");
+    _resultMetadata = getResultMetadata(_elseThenStatements);
+  }
+
+  private TransformResultMetadata getResultMetadata(List<TransformFunction> 
elseThenStatements) {
+    FieldSpec.DataType dataType = 
elseThenStatements.get(0).getResultMetadata().getDataType();
+    for (int i = 1; i < elseThenStatements.size(); i++) {
+      TransformResultMetadata resultMetadata = 
elseThenStatements.get(i).getResultMetadata();
+      if (!resultMetadata.isSingleValue()) {
+        throw new IllegalStateException(
+            String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+      }
+      switch (dataType) {
+        case INT:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case STRING:
+              dataType = resultMetadata.getDataType();
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case LONG:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case LONG:
+              break;
+            case FLOAT:
+            case DOUBLE:
+              dataType = FieldSpec.DataType.DOUBLE;
+              break;
+            case STRING:
+              dataType = FieldSpec.DataType.STRING;
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case FLOAT:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case FLOAT:
+              break;
+            case LONG:
+            case DOUBLE:
+              dataType = FieldSpec.DataType.DOUBLE;
+              break;
+            case STRING:
+              dataType = resultMetadata.getDataType();

Review comment:
       ```suggestion
                 dataType = FieldSpec.DataType.STRING;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
##########
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The <code>CaseTransformFunction</code> class implements the 
CASE-WHEN-THEN-ELSE transformation.
+ *
+ * The SQL Syntax is:
+ *    CASE
+ *        WHEN condition1 THEN result1
+ *        WHEN condition2 THEN result2
+ *        WHEN conditionN THEN resultN
+ *        ELSE result
+ *    END;
+ *
+ * Usage:
+ *    case(${WHEN_STATEMENT_1}, ..., ${WHEN_STATEMENT_N},
+ *         ${THEN_EXPRESSION_1}, ..., ${THEN_EXPRESSION_N},
+ *         ${ELSE_EXPRESSION})
+ *
+ * There are 2 * N + 1 arguments:
+ *    <code>WHEN_STATEMENT_$i</code> is a 
<code>BinaryOperatorTransformFunction</code> represents <code>condition$i</code>
+ *    <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> 
represents <code>result$i</code>
+ *    <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> 
represents <code>result</code>
+ *
+ */
+public class CaseTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "case";
+  private final List<TransformFunction> _whenStatements = new ArrayList<>();
+  private final List<TransformFunction> _elseThenStatements = new 
ArrayList<>();
+  private int _numberWhenStatements;
+  private TransformResultMetadata _resultMetadata;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
+      throw new IllegalArgumentException("At least 3 odd number of arguments 
are required for CASE-WHEN-ELSE function");
+    }
+    _numberWhenStatements = arguments.size() / 2;
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      _whenStatements.add(arguments.get(i));
+    }
+    // Add ELSE Statement first
+    _elseThenStatements.add(arguments.get(_numberWhenStatements * 2));
+    for (int i = _numberWhenStatements; i < _numberWhenStatements * 2; i++) {
+      _elseThenStatements.add(arguments.get(i));
+    }
+    Preconditions.checkState(_elseThenStatements.size() == 
_numberWhenStatements + 1, "Missing THEN/ELSE clause in CASE statement");
+    _resultMetadata = getResultMetadata(_elseThenStatements);
+  }
+
+  private TransformResultMetadata getResultMetadata(List<TransformFunction> 
elseThenStatements) {

Review comment:
       (nit) no need to pass in argument

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
##########
@@ -0,0 +1,482 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The <code>CaseTransformFunction</code> class implements the 
CASE-WHEN-THEN-ELSE transformation.
+ *
+ * The SQL Syntax is:
+ *    CASE
+ *        WHEN condition1 THEN result1
+ *        WHEN condition2 THEN result2
+ *        WHEN conditionN THEN resultN
+ *        ELSE result
+ *    END;
+ *
+ * Usage:
+ *    case(${WHEN_STATEMENT_1}, ..., ${WHEN_STATEMENT_N},
+ *         ${THEN_EXPRESSION_1}, ..., ${THEN_EXPRESSION_N},
+ *         ${ELSE_EXPRESSION})
+ *
+ * There are 2 * N + 1 arguments:
+ *    <code>WHEN_STATEMENT_$i</code> is a 
<code>BinaryOperatorTransformFunction</code> represents <code>condition$i</code>
+ *    <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> 
represents <code>result$i</code>
+ *    <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> 
represents <code>result</code>
+ *
+ */
+public class CaseTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "case";
+  private final List<TransformFunction> _whenStatements = new ArrayList<>();
+  private final List<TransformFunction> _elseThenStatements = new 
ArrayList<>();
+  private int _numberWhenStatements;
+  private TransformResultMetadata _resultMetadata;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
+      throw new IllegalArgumentException("At least 3 odd number of arguments 
are required for CASE-WHEN-ELSE function");
+    }
+    _numberWhenStatements = arguments.size() / 2;
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      _whenStatements.add(arguments.get(i));
+    }
+    // Add ELSE Statement first
+    _elseThenStatements.add(arguments.get(_numberWhenStatements * 2));
+    for (int i = _numberWhenStatements; i < _numberWhenStatements * 2; i++) {
+      _elseThenStatements.add(arguments.get(i));
+    }
+    _resultMetadata = getResultMetadata(_elseThenStatements);
+  }
+
+  private TransformResultMetadata getResultMetadata(List<TransformFunction> 
elseThenStatements) {
+    FieldSpec.DataType dataType = 
elseThenStatements.get(0).getResultMetadata().getDataType();
+    for (int i = 1; i < elseThenStatements.size(); i++) {
+      TransformResultMetadata resultMetadata = 
elseThenStatements.get(i).getResultMetadata();
+      if (!resultMetadata.isSingleValue()) {
+        throw new IllegalStateException(
+            String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+      }
+      switch (dataType) {
+        case INT:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case STRING:
+              dataType = resultMetadata.getDataType();
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case LONG:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case LONG:
+              break;
+            case FLOAT:
+            case DOUBLE:
+              dataType = FieldSpec.DataType.DOUBLE;
+              break;
+            case STRING:
+              dataType = FieldSpec.DataType.STRING;
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case FLOAT:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case FLOAT:
+              break;
+            case LONG:
+            case DOUBLE:
+              dataType = FieldSpec.DataType.DOUBLE;
+              break;
+            case STRING:
+              dataType = resultMetadata.getDataType();
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case DOUBLE:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case FLOAT:
+            case LONG:
+            case DOUBLE:
+              break;
+            case STRING:
+              dataType = resultMetadata.getDataType();
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case STRING:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case FLOAT:
+            case LONG:
+            case DOUBLE:
+            case STRING:
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        default:
+          if (resultMetadata.getDataType() != dataType) {
+            throw new IllegalStateException(
+                String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+      }
+    }
+    return new TransformResultMetadata(dataType, true, false);
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return _resultMetadata;
+  }
+
+  private int[] getSelectedArray(ProjectionBlock projectionBlock) {
+    int[] selected = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];

Review comment:
       +1 for some comments. The algorithm is smart, but not very intuitive

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
##########
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * <code>BinaryOperatorTransformFunction</code> abstracts common functions for 
binary operators (=, !=, >=, >, <=, <)
+ * The results are in boolean format and stored as an integer array with 1 
represents true and 0 represents false.
+ */
+public abstract class BinaryOperatorTransformFunction extends 
BaseTransformFunction {
+
+  protected TransformFunction _leftTransformFunction;
+  protected TransformFunction _rightTransformFunction;
+  protected FieldSpec.DataType _leftDataType;
+  protected FieldSpec.DataType _rightDataType;
+  protected int[] _results;
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are exact 2 arguments
+    Preconditions
+        .checkArgument(arguments.size() == 2, "Exact 2 arguments are required 
for binary operator transform function");
+    _leftTransformFunction = arguments.get(0);
+    _rightTransformFunction = arguments.get(1);
+    _leftDataType = _leftTransformFunction.getResultMetadata().getDataType();
+    _rightDataType = _rightTransformFunction.getResultMetadata().getDataType();
+    // Data type check: left and right types should be compatible.
+    if (_leftDataType == FieldSpec.DataType.BYTES || _rightDataType == 
FieldSpec.DataType.BYTES) {
+      Preconditions.checkState(_leftDataType == FieldSpec.DataType.BYTES && 
_rightDataType == FieldSpec.DataType.BYTES,
+          String.format(
+              "Unsupported data type for comparison: [Left Transform Function 
[%s] result type is [%s], Right Transform Function [%s] result type is [%s]]",
+              _leftTransformFunction.getName(), _leftDataType, 
_rightTransformFunction.getName(), _rightDataType));
+    }
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return INT_SV_NO_DICTIONARY_METADATA;
+  }
+
+  protected void fillResultArray(ProjectionBlock projectionBlock) {
+    if (_results == null) {
+      _results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    }
+    int length = projectionBlock.getNumDocs();
+    switch (_leftDataType) {
+      case INT:
+        int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
+        switch (_rightDataType) {
+          case INT:
+            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+            for (int i = 0; i < length; i++) {
+              _results[i] = 
getBinaryFuncResult(Integer.compare(leftIntValues[i], rightIntValues[i]));

Review comment:
       @fx19880617 The TransformFunction itself can handle the data type 
conversion (check BaseTransformFunction which is the middle layer you 
mentioned). You can use the left & right data type to decide the type of data 
to transform and directly compare.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
##########
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The <code>CaseTransformFunction</code> class implements the 
CASE-WHEN-THEN-ELSE transformation.
+ *
+ * The SQL Syntax is:
+ *    CASE
+ *        WHEN condition1 THEN result1
+ *        WHEN condition2 THEN result2
+ *        WHEN conditionN THEN resultN
+ *        ELSE result
+ *    END;
+ *
+ * Usage:
+ *    case(${WHEN_STATEMENT_1}, ..., ${WHEN_STATEMENT_N},
+ *         ${THEN_EXPRESSION_1}, ..., ${THEN_EXPRESSION_N},
+ *         ${ELSE_EXPRESSION})
+ *
+ * There are 2 * N + 1 arguments:
+ *    <code>WHEN_STATEMENT_$i</code> is a 
<code>BinaryOperatorTransformFunction</code> represents <code>condition$i</code>
+ *    <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> 
represents <code>result$i</code>
+ *    <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> 
represents <code>result</code>
+ *
+ */
+public class CaseTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "case";
+  private final List<TransformFunction> _whenStatements = new ArrayList<>();
+  private final List<TransformFunction> _elseThenStatements = new 
ArrayList<>();
+  private int _numberWhenStatements;
+  private TransformResultMetadata _resultMetadata;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
+      throw new IllegalArgumentException("At least 3 odd number of arguments 
are required for CASE-WHEN-ELSE function");
+    }
+    _numberWhenStatements = arguments.size() / 2;
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      _whenStatements.add(arguments.get(i));
+    }
+    // Add ELSE Statement first
+    _elseThenStatements.add(arguments.get(_numberWhenStatements * 2));
+    for (int i = _numberWhenStatements; i < _numberWhenStatements * 2; i++) {
+      _elseThenStatements.add(arguments.get(i));
+    }
+    Preconditions.checkState(_elseThenStatements.size() == 
_numberWhenStatements + 1, "Missing THEN/ELSE clause in CASE statement");
+    _resultMetadata = getResultMetadata(_elseThenStatements);
+  }
+
+  private TransformResultMetadata getResultMetadata(List<TransformFunction> 
elseThenStatements) {
+    FieldSpec.DataType dataType = 
elseThenStatements.get(0).getResultMetadata().getDataType();
+    for (int i = 1; i < elseThenStatements.size(); i++) {
+      TransformResultMetadata resultMetadata = 
elseThenStatements.get(i).getResultMetadata();
+      if (!resultMetadata.isSingleValue()) {

Review comment:
       Also, the check for the first statement is missing

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BinaryOperatorTransformFunction.java
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * <code>BinaryOperatorTransformFunction</code> abstracts common functions for 
binary operators (=, !=, >=, >, <=, <)
+ * The results are in boolean format and stored as an integer array with 1 
represents true and 0 represents false.
+ */
+public abstract class BinaryOperatorTransformFunction extends 
BaseTransformFunction {
+
+  protected TransformFunction _leftTransformFunction;
+  protected TransformFunction _rightTransformFunction;
+  protected FieldSpec.DataType _leftDataType;
+  protected FieldSpec.DataType _rightDataType;
+  protected int[] _results;
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are exact 2 arguments
+    Preconditions
+        .checkArgument(arguments.size() == 2, "Exact 2 arguments are required 
for binary operator transform function");
+    _leftTransformFunction = arguments.get(0);
+    _rightTransformFunction = arguments.get(1);
+    _leftDataType = _leftTransformFunction.getResultMetadata().getDataType();
+    _rightDataType = _rightTransformFunction.getResultMetadata().getDataType();
+    // Data type check: left and right types should be compatible.
+    if (_leftDataType == FieldSpec.DataType.BYTES || _rightDataType == 
FieldSpec.DataType.BYTES) {
+      Preconditions.checkState(_leftDataType == FieldSpec.DataType.BYTES && 
_rightDataType == FieldSpec.DataType.BYTES,
+          String.format(
+              "Unsupported data type for comparison: [Left Transform Function 
[%s] result type is [%s], Right Transform Function [%s] result type is [%s]]",
+              _leftTransformFunction.getName(), _leftDataType, 
_rightTransformFunction.getName(), _rightDataType));
+    }
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return INT_SV_NO_DICTIONARY_METADATA;
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+    fillResultArray(projectionBlock);
+    return _results;
+  }
+
+  protected void fillResultArray(ProjectionBlock projectionBlock) {
+    if (_results == null) {
+      _results = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    }
+    int length = projectionBlock.getNumDocs();
+    switch (_leftDataType) {
+      case INT:
+        int[] leftIntValues = 
_leftTransformFunction.transformToIntValuesSV(projectionBlock);
+        switch (_rightDataType) {
+          case INT:
+            int[] rightIntValues = 
_rightTransformFunction.transformToIntValuesSV(projectionBlock);
+            for (int i = 0; i < length; i++) {
+              _results[i] = 
getBinaryFuncResult(Integer.compare(leftIntValues[i], rightIntValues[i]));

Review comment:
       Also, why do we have compare here? It will make the BinaryOperator not 
suitable for other Functions other than comparison.
   For performance concern, we should pass the array into the 
`getBinaryFuncResult`, where you can have one `getBinaryFuncResult` for each 
data type.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
##########
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The <code>CaseTransformFunction</code> class implements the 
CASE-WHEN-THEN-ELSE transformation.
+ *
+ * The SQL Syntax is:
+ *    CASE
+ *        WHEN condition1 THEN result1
+ *        WHEN condition2 THEN result2
+ *        WHEN conditionN THEN resultN
+ *        ELSE result
+ *    END;
+ *
+ * Usage:
+ *    case(${WHEN_STATEMENT_1}, ..., ${WHEN_STATEMENT_N},
+ *         ${THEN_EXPRESSION_1}, ..., ${THEN_EXPRESSION_N},
+ *         ${ELSE_EXPRESSION})
+ *
+ * There are 2 * N + 1 arguments:
+ *    <code>WHEN_STATEMENT_$i</code> is a 
<code>BinaryOperatorTransformFunction</code> represents <code>condition$i</code>
+ *    <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> 
represents <code>result$i</code>
+ *    <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> 
represents <code>result</code>
+ *
+ */
+public class CaseTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "case";
+  private final List<TransformFunction> _whenStatements = new ArrayList<>();
+  private final List<TransformFunction> _elseThenStatements = new 
ArrayList<>();
+  private int _numberWhenStatements;
+  private TransformResultMetadata _resultMetadata;
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
+    // Check that there are more than 1 arguments
+    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
+      throw new IllegalArgumentException("At least 3 odd number of arguments 
are required for CASE-WHEN-ELSE function");
+    }
+    _numberWhenStatements = arguments.size() / 2;
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      _whenStatements.add(arguments.get(i));
+    }
+    // Add ELSE Statement first
+    _elseThenStatements.add(arguments.get(_numberWhenStatements * 2));
+    for (int i = _numberWhenStatements; i < _numberWhenStatements * 2; i++) {
+      _elseThenStatements.add(arguments.get(i));
+    }
+    Preconditions.checkState(_elseThenStatements.size() == 
_numberWhenStatements + 1, "Missing THEN/ELSE clause in CASE statement");
+    _resultMetadata = getResultMetadata(_elseThenStatements);
+  }
+
+  private TransformResultMetadata getResultMetadata(List<TransformFunction> 
elseThenStatements) {
+    FieldSpec.DataType dataType = 
elseThenStatements.get(0).getResultMetadata().getDataType();
+    for (int i = 1; i < elseThenStatements.size(); i++) {
+      TransformResultMetadata resultMetadata = 
elseThenStatements.get(i).getResultMetadata();
+      if (!resultMetadata.isSingleValue()) {
+        throw new IllegalStateException(
+            String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+      }
+      switch (dataType) {
+        case INT:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case STRING:
+              dataType = resultMetadata.getDataType();
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case LONG:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case LONG:
+              break;
+            case FLOAT:
+            case DOUBLE:
+              dataType = FieldSpec.DataType.DOUBLE;
+              break;
+            case STRING:
+              dataType = FieldSpec.DataType.STRING;
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case FLOAT:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case FLOAT:
+              break;
+            case LONG:
+            case DOUBLE:
+              dataType = FieldSpec.DataType.DOUBLE;
+              break;
+            case STRING:
+              dataType = resultMetadata.getDataType();
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case DOUBLE:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case FLOAT:
+            case LONG:
+            case DOUBLE:
+              break;
+            case STRING:
+              dataType = resultMetadata.getDataType();
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        case STRING:
+          switch (resultMetadata.getDataType()) {
+            case INT:
+            case FLOAT:
+            case LONG:
+            case DOUBLE:
+            case STRING:
+              break;
+            default:
+              throw new IllegalStateException(
+                  String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+          break;
+        default:
+          if (resultMetadata.getDataType() != dataType) {
+            throw new IllegalStateException(
+                String.format("Incompatible expression types in THEN Clause 
[%s].", resultMetadata));
+          }
+      }
+    }
+    return new TransformResultMetadata(dataType, true, false);
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return _resultMetadata;
+  }
+
+  private int[] getSelectedArray(ProjectionBlock projectionBlock) {
+    int[] selected = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    for (int i = 0; i < _numberWhenStatements; i++) {
+      TransformFunction transformFunction = _whenStatements.get(i);
+      int[] conditions = 
transformFunction.transformToIntValuesSV(projectionBlock);
+      for (int j = 0; j < conditions.length; j++) {
+        if (selected[j] == 0 && conditions[j] == 1) {
+          selected[j] = i + 1;
+        }
+      }
+    }
+    return selected;
+  }
+
+  @Override
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {

Review comment:
       Here you should check the result metadata first and only if it is not 
INT, directly call `super. transformToIntValuesSV()` which will do the type 
casting. Same for other methods. You don't need a separate switch inside the 
method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to