JinkunLiu commented on code in PR #27330:
URL: https://github.com/apache/flink/pull/27330#discussion_r2673245798
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala:
##########
@@ -1668,6 +1668,108 @@ object ScalarOperatorGens {
generateUnaryOperatorIfNotNull(ctx, resultType, map)(_ =>
s"${map.resultTerm}.size()")
}
+ def generateVariantGet(
+ ctx: CodeGeneratorContext,
+ variant: GeneratedExpression,
+ key: GeneratedExpression): GeneratedExpression = {
+ val Seq(resultTerm, nullTerm) = newNames(ctx, "result", "isNull")
+ val binaryVariantTerm = newName(ctx, "binaryVariantTerm")
+
+ val variantType = variant.resultType.asInstanceOf[VariantType]
+ val keyType = key.resultType
+
+ val variantTypeTerm = primitiveTypeTermForType(variantType)
+ val variantDefault = primitiveDefaultValue(variantType)
+ val variantTerm = variant.resultTerm
+ val keyTerm = key.resultTerm
+
+ val accessCode = if (isInteger(keyType)) {
+ generateIntegerKeyAccess(
+ variantTerm,
+ binaryVariantTerm,
+ keyTerm,
+ resultTerm,
+ nullTerm
+ )
+ } else if (isCharacterString(keyType)) {
+ generateCharacterStringKeyAccess(
+ variantTerm,
+ binaryVariantTerm,
+ keyTerm,
+ resultTerm,
+ nullTerm
+ )
+ } else {
+ throw new CodeGenException(s"Unsupported key type for variant: $keyType")
+ }
+
+ val finalCode =
+ s"""
+ |${variant.code}
+ |${key.code}
+ |boolean $nullTerm = (${variant.nullTerm}|| ${key.nullTerm});
+ |$variantTypeTerm $resultTerm = $variantDefault;
+ |if (!$nullTerm) {
+ | $accessCode
+ |}
+ """.stripMargin
+
+ GeneratedExpression(resultTerm, nullTerm, finalCode, variantType)
+ }
+
+ private def generateCharacterStringKeyAccess(
+ variantTerm: String,
+ binaryVariantTerm: String,
+ keyTerm: String,
+ resultTerm: String,
+ nullTerm: String): String = {
+ s"""
+ | if ($variantTerm.isObject()){
+ | if ($variantTerm instanceof $BINARY_VARIANT) {
+ | $BINARY_VARIANT $binaryVariantTerm = ($BINARY_VARIANT)
$variantTerm;
+ |
+ | String keyStr = null;
+ | if ($keyTerm instanceof $BINARY_STRING) {
+ | keyStr = (($BINARY_STRING) $keyTerm).toString();
+ | } else {
+ | keyStr = String.valueOf($keyTerm);
+ | }
+ |
+ | if (keyStr != null) {
+ | $resultTerm = $binaryVariantTerm.getField(keyStr);
+ | } else {
+ | $nullTerm = true;
+ | }
+ | } else {
+ | $nullTerm = true;
+ | }
+ | } else {
+ | throw new org.apache.flink.table.api.TableRuntimeException("String
key access on variant requires an object variant, but a non-object variant was
provided.");
+ | }
+ """.stripMargin
+ }
+
+ private def generateIntegerKeyAccess(
+ variantTerm: String,
+ binaryVariantTerm: String,
+ keyTerm: String,
+ resultTerm: String,
+ nullTerm: String): String = {
+ s"""
+ | if ($variantTerm.isArray()){
+ | if ($variantTerm instanceof $BINARY_VARIANT) {
+ | $BINARY_VARIANT $binaryVariantTerm = ($BINARY_VARIANT)
$variantTerm;
+ | $resultTerm = $binaryVariantTerm.getElement((int)$keyTerm - 1);
Review Comment:
Thanks for the comment. In Flink SQL, array indexing is 1-based, and I think
we should keep it consistent here and also use 1-based indexing. Similarly, in
Calcite, `VARIANT` indexing is also 1-based.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]