xuyangzhong commented on code in PR #27330:
URL: https://github.com/apache/flink/pull/27330#discussion_r2744516860
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala:
##########
@@ -1668,6 +1668,100 @@ object ScalarOperatorGens {
generateUnaryOperatorIfNotNull(ctx, resultType, map)(_ =>
s"${map.resultTerm}.size()")
}
+ def generateVariantGet(
+ ctx: CodeGeneratorContext,
+ variant: GeneratedExpression,
+ key: GeneratedExpression): GeneratedExpression = {
+ val Seq(resultTerm, nullTerm) = newNames(ctx, "result", "isNull")
+ val tmpValue = newName(ctx, "tmpValue")
+
+ val variantType = variant.resultType.asInstanceOf[VariantType]
+ val variantTerm = variant.resultTerm
+ val variantTypeTerm = primitiveTypeTermForType(variantType)
+ val variantDefaultTerm = primitiveDefaultValue(variantType)
+
+ val keyTerm = key.resultTerm
+ val keyType = key.resultType
+
+ val accessCode = if (isIntegerNumeric(keyType)) {
+ generateIntegerKeyAccess(
+ variantTerm,
+ variantTypeTerm,
+ keyTerm,
+ resultTerm,
+ nullTerm,
+ tmpValue
+ )
+ } else if (isCharacterString(keyType)) {
+ val fieldName = key.literalValue.get.toString
+ generateCharacterStringKeyAccess(
+ variantTerm,
+ variantTypeTerm,
+ fieldName,
+ resultTerm,
+ nullTerm,
+ tmpValue
+ )
+ } else {
+ throw new CodeGenException(s"Unsupported key type for variant: $keyType")
+ }
+
+ val finalCode =
+ s"""
+ |${variant.code}
+ |${key.code}
+ |boolean $nullTerm = (${variant.nullTerm} || ${key.nullTerm});
+ |$variantTypeTerm $resultTerm = $variantDefaultTerm;
+ |if (!$nullTerm) {
+ | $accessCode
+ |}
+ """.stripMargin
+
+ GeneratedExpression(resultTerm, nullTerm, finalCode, variantType)
+ }
+
+ private def generateCharacterStringKeyAccess(
+ variantTerm: String,
+ variantTypeTerm: String,
+ fieldName: String,
+ resultTerm: String,
+ nullTerm: String,
+ tmpValue: String): String = {
+ s"""
+ | if ($variantTerm.isObject()){
+ | $variantTypeTerm $tmpValue = $variantTerm.getField("$fieldName");
+ | if ($tmpValue == null) {
+ | $nullTerm = true;
+ | } else {
+ | $resultTerm = $tmpValue;
+ | }
+ | } else {
+ | throw new org.apache.flink.table.api.TableRuntimeException("String
key access on variant requires an object variant, but a non-object variant was
provided.");
+ | }
+ """.stripMargin
+ }
+
+ private def generateIntegerKeyAccess(
+ variantTerm: String,
+ variantTypeTerm: String,
+ keyTerm: String,
+ resultTerm: String,
+ nullTerm: String,
+ tmpValue: String): String = {
+ s"""
+ | if ($variantTerm.isArray()){
+ | $variantTypeTerm $tmpValue = $variantTerm.getElement((int)
$keyTerm - 1);
Review Comment:
Nit what about adding tests for other integer numeric? For example
```
VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES =
TableTestProgram.of(
"variant-array-access-with-different-index-types",
"validates variant array access with
different index types using [] operator in sql and at() in table api")
.setupTableSource(VARIANT_ARRAY_SOURCE)
.setupTableSink(VARIANT_ARRAY_SINK)
.runSql(
"INSERT INTO sink_t SELECT v[cast(1 as
tinyint)], v[cast(2 as smallint)], v[cast(4 as bigint)] FROM t")
.runTableApi(
t ->
t.from("t")
.select(
$("v").at((byte)
1).as("v1"),
$("v").at((short)
2).as("v2"),
$("v").at((long)
3).as("v3")),
"sink_t")
.build();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]