JinkunLiu commented on code in PR #27330:
URL: https://github.com/apache/flink/pull/27330#discussion_r2744563056


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala:
##########
@@ -1668,6 +1668,100 @@ object ScalarOperatorGens {
     generateUnaryOperatorIfNotNull(ctx, resultType, map)(_ => 
s"${map.resultTerm}.size()")
   }
 
+  def generateVariantGet(
+      ctx: CodeGeneratorContext,
+      variant: GeneratedExpression,
+      key: GeneratedExpression): GeneratedExpression = {
+    val Seq(resultTerm, nullTerm) = newNames(ctx, "result", "isNull")
+    val tmpValue = newName(ctx, "tmpValue")
+
+    val variantType = variant.resultType.asInstanceOf[VariantType]
+    val variantTerm = variant.resultTerm
+    val variantTypeTerm = primitiveTypeTermForType(variantType)
+    val variantDefaultTerm = primitiveDefaultValue(variantType)
+
+    val keyTerm = key.resultTerm
+    val keyType = key.resultType
+
+    val accessCode = if (isIntegerNumeric(keyType)) {
+      generateIntegerKeyAccess(
+        variantTerm,
+        variantTypeTerm,
+        keyTerm,
+        resultTerm,
+        nullTerm,
+        tmpValue
+      )
+    } else if (isCharacterString(keyType)) {
+      val fieldName = key.literalValue.get.toString
+      generateCharacterStringKeyAccess(
+        variantTerm,
+        variantTypeTerm,
+        fieldName,
+        resultTerm,
+        nullTerm,
+        tmpValue
+      )
+    } else {
+      throw new CodeGenException(s"Unsupported key type for variant: $keyType")
+    }
+
+    val finalCode =
+      s"""
+         |${variant.code}
+         |${key.code}
+         |boolean $nullTerm = (${variant.nullTerm} || ${key.nullTerm});
+         |$variantTypeTerm $resultTerm = $variantDefaultTerm;
+         |if (!$nullTerm) {
+         |  $accessCode
+         |}
+      """.stripMargin
+
+    GeneratedExpression(resultTerm, nullTerm, finalCode, variantType)
+  }
+
+  private def generateCharacterStringKeyAccess(
+      variantTerm: String,
+      variantTypeTerm: String,
+      fieldName: String,
+      resultTerm: String,
+      nullTerm: String,
+      tmpValue: String): String = {
+    s"""
+       |  if ($variantTerm.isObject()){
+       |    $variantTypeTerm $tmpValue = $variantTerm.getField("$fieldName");
+       |    if ($tmpValue == null) {
+       |      $nullTerm = true;
+       |    } else {
+       |      $resultTerm = $tmpValue;
+       |    }
+       |  } else {
+       |    throw new org.apache.flink.table.api.TableRuntimeException("String 
key access on variant requires an object variant, but a non-object variant was 
provided.");
+       |  }
+    """.stripMargin
+  }
+
+  private def generateIntegerKeyAccess(
+      variantTerm: String,
+      variantTypeTerm: String,
+      keyTerm: String,
+      resultTerm: String,
+      nullTerm: String,
+      tmpValue: String): String = {
+    s"""
+       |  if ($variantTerm.isArray()){
+       |    $variantTypeTerm $tmpValue = $variantTerm.getElement((int) 
$keyTerm - 1);

Review Comment:
   Good suggestion! I have finished it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to