github-actions[bot] commented on code in PR #63192:
URL: https://github.com/apache/doris/pull/63192#discussion_r3253840980


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AccessPathPlanCollector.java:
##########
@@ -209,7 +213,7 @@ public Void visitLogicalGenerate(LogicalGenerate<? extends 
Plan> generate, State
                     if (useWholeItem) {
                         // use the whole column
                         for (Expression child : function.children()) {
-                            exprCollector.collect(child);
+                            exprCollector.collectWholeVariantExpression(child);
                         }

Review Comment:
   The generator propagation added here still misses `ExplodeVariantArray`. 
That function is a separate generator class, so it falls through to the generic 
`exprCollector.collect(function)` branch below; with an access on the generated 
output such as `SELECT x['k'] FROM t LATERAL VIEW explode_variant_array(v) tmp 
AS x WHERE v[0]['p'] IS NOT NULL`, the output path on `x` is never propagated 
back to input `v`. The sibling predicate can then leave only a narrow `[v, 0, 
p]` access path at the scan, even though `explode_variant_array(v)` needs the 
full array/root to produce `x['k']`, causing missing/null results. This is 
distinct from the existing `LogicalGenerate` thread because that one covers 
regular `Explode`; please handle `ExplodeVariantArray` explicitly and add 
coverage combining generated-output access with a sibling VARIANT predicate.



##########
regression-test/suites/external_table_p0/tvf/test_local_tvf_iceberg_variant.groovy:
##########
@@ -0,0 +1,444 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.net.InetAddress
+import java.net.NetworkInterface
+import java.nio.file.Files
+import java.nio.file.StandardCopyOption
+import org.apache.doris.regression.action.ProfileAction
+
+suite("test_local_tvf_iceberg_variant", "p0,external") {
+    List<List<Object>> backends = sql """ show backends """
+    assertTrue(backends.size() > 0)
+
+    def dataFilePath = context.config.dataPath + "/external_table_p0/tvf/"
+    def beId = backends[0][0]
+    def outFilePath = "/"
+    def unshreddedData = "${dataFilePath}/iceberg_variant_unshredded.parquet"
+    def shreddedData = "${dataFilePath}/iceberg_variant_shredded.parquet"
+    def typedOnlyData = "${dataFilePath}/iceberg_variant_typed_only.parquet"
+    def temporalUnshreddedData = 
"${dataFilePath}/iceberg_variant_temporal_unshredded.parquet"
+    def temporalTypedData = 
"${dataFilePath}/iceberg_variant_temporal_typed.parquet"
+    def binaryUnshreddedData = 
"${dataFilePath}/iceberg_variant_binary_unshredded.parquet"
+    def binaryTypedData = 
"${dataFilePath}/iceberg_variant_binary_typed.parquet"
+
+    def localHosts = ["localhost", "127.0.0.1", 
InetAddress.localHost.hostAddress, InetAddress.localHost.hostName] as Set
+    NetworkInterface.networkInterfaces.each { networkInterface ->
+        networkInterface.inetAddresses.each { inetAddress ->
+            localHosts.add(inetAddress.hostAddress)
+            localHosts.add(inetAddress.hostName)
+        }
+    }
+
+    def dorisHome = new File(context.config.dataPath).parentFile.parentFile
+    def localBeHome = new File(dorisHome, "output/be")
+    def localJdbc = context.config.jdbcUrl.contains("127.0.0.1") || 
context.config.jdbcUrl.contains("localhost")
+    if (localJdbc && backends.size() == 1 && 
localHosts.contains(backends[0][1]) && localBeHome.exists()) {
+        outFilePath = ""
+        Files.copy(new File(unshreddedData).toPath(), new File(localBeHome, 
"iceberg_variant_unshredded.parquet").toPath(),
+                StandardCopyOption.REPLACE_EXISTING)
+        Files.copy(new File(shreddedData).toPath(), new File(localBeHome, 
"iceberg_variant_shredded.parquet").toPath(),
+                StandardCopyOption.REPLACE_EXISTING)
+        Files.copy(new File(typedOnlyData).toPath(), new File(localBeHome, 
"iceberg_variant_typed_only.parquet").toPath(),
+                StandardCopyOption.REPLACE_EXISTING)
+        Files.copy(new File(temporalUnshreddedData).toPath(), new 
File(localBeHome, "iceberg_variant_temporal_unshredded.parquet").toPath(),
+                StandardCopyOption.REPLACE_EXISTING)
+        Files.copy(new File(temporalTypedData).toPath(), new File(localBeHome, 
"iceberg_variant_temporal_typed.parquet").toPath(),
+                StandardCopyOption.REPLACE_EXISTING)
+        Files.copy(new File(binaryUnshreddedData).toPath(), new 
File(localBeHome, "iceberg_variant_binary_unshredded.parquet").toPath(),
+                StandardCopyOption.REPLACE_EXISTING)
+        Files.copy(new File(binaryTypedData).toPath(), new File(localBeHome, 
"iceberg_variant_binary_typed.parquet").toPath(),
+                StandardCopyOption.REPLACE_EXISTING)
+    } else {
+        for (List<Object> backend : backends) {
+            def beHost = backend[1]
+            scpFiles("root", beHost, unshreddedData, outFilePath, false)
+            scpFiles("root", beHost, shreddedData, outFilePath, false)
+            scpFiles("root", beHost, typedOnlyData, outFilePath, false)
+            scpFiles("root", beHost, temporalUnshreddedData, outFilePath, 
false)
+            scpFiles("root", beHost, temporalTypedData, outFilePath, false)
+            scpFiles("root", beHost, binaryUnshreddedData, outFilePath, false)
+            scpFiles("root", beHost, binaryTypedData, outFilePath, false)
+        }
+    }
+
+    def unshredded = outFilePath + "iceberg_variant_unshredded.parquet"
+    def shredded = outFilePath + "iceberg_variant_shredded.parquet"
+    def typedOnly = outFilePath + "iceberg_variant_typed_only.parquet"
+    def temporalUnshredded = outFilePath + 
"iceberg_variant_temporal_unshredded.parquet"
+    def temporalTyped = outFilePath + "iceberg_variant_temporal_typed.parquet"
+    def binaryUnshredded = outFilePath + 
"iceberg_variant_binary_unshredded.parquet"
+    def binaryTyped = outFilePath + "iceberg_variant_binary_typed.parquet"
+    def profileAction = new ProfileAction(context)
+    def getProfileByToken = { token ->
+        for (int i = 0; i < 60; ++i) {
+            List profileData = profileAction.getProfileList()
+            for (final def profileItem in profileData) {
+                if (profileItem["Sql Statement"].toString().contains(token)) {
+                    def profileText = 
profileAction.getProfile(profileItem["Profile ID"].toString()).toString()
+                    if (profileText.contains("ParquetReadColumnPaths")) {
+                        return profileText
+                    }
+                }
+            }
+            Thread.sleep(1000)
+        }
+        assertTrue(false)
+    }
+    def getParquetReadColumnPathSet = { profileText ->
+        def parquetReadColumnPaths = profileText.readLines().find { 
it.contains("ParquetReadColumnPaths") }
+        assertTrue(parquetReadColumnPaths != null)
+        logger.info("Iceberg variant shredding ${parquetReadColumnPaths}")
+        def separatorIndex = parquetReadColumnPaths.indexOf(":")
+        assertTrue(separatorIndex >= 0)
+        return parquetReadColumnPaths.substring(separatorIndex + 1)
+                .split(",")
+                .collect { it.trim() }
+                .findAll { !it.isEmpty() } as Set
+    }
+    def getProfileCounter = { profileText, counterName ->
+        def counterLine = profileText.readLines().find { 
it.contains(counterName) }
+        assertTrue(counterLine != null)
+        def matcher = counterLine =~ /${counterName}:\s*([0-9,]+)/
+        assertTrue(matcher.find())
+        return matcher.group(1).replace(",", "").toLong()
+    }
+
+    qt_desc_unshredded """
+        desc function local(
+            "file_path" = "${unshredded}",
+            "backend_id" = "${beId}",
+            "format" = "parquet")
+    """
+
+    qt_desc_typed_only """
+        desc function local(
+            "file_path" = "${typedOnly}",
+            "backend_id" = "${beId}",
+            "format" = "parquet")
+    """
+
+    order_qt_unshredded_complex """
+        select id,
+               cast(v['id'] as int) as variant_id,
+               cast(v['name'] as string) as name,
+               cast(v['metric'] as bigint) as metric,
+               cast(v['nested']['score'] as int) as score,
+               cast(v['nested']['flag'] as boolean) as flag,
+               cast(v['arr'] as array<int>)[1] as first_arr,
+               cast(v['arr'] as array<text>)[2] as second_arr
+        from local(
+            "file_path" = "${unshredded}",
+            "backend_id" = "${beId}",
+            "format" = "parquet")
+        order by id
+    """
+
+    order_qt_shredded_fields """
+        select id,
+               cast(v['metric'] as bigint) as metric,
+               cast(v['name'] as string) as name
+        from local(
+            "file_path" = "${shredded}",
+            "backend_id" = "${beId}",
+            "format" = "parquet")
+        order by id
+    """
+
+    order_qt_shredded_full_variant_with_scalar """
+        select id,
+               cast(v as string) like '%"name":"name-%' as has_name,
+               cast(v as string) like '%"metric":%' as has_metric
+        from local(
+            "file_path" = "${shredded}",
+            "backend_id" = "${beId}",
+            "format" = "parquet")
+        order by id
+    """
+
+    order_qt_typed_only_fields """
+        select id,
+               cast(v['metric'] as bigint) as metric,
+               cast(v['nested']['x'] as string) as nested_x,
+               cast(v['f'] as string) is null as non_finite_float_is_null,
+               cast(v['items'] as string) as items
+        from local(
+            "file_path" = "${typedOnly}",
+            "backend_id" = "${beId}",
+            "format" = "parquet")
+        order by id
+    """
+
+    order_qt_typed_only_missing_field """
+        select id,
+               cast(v['missing'] as string) is null as missing_is_null
+        from local(
+            "file_path" = "${typedOnly}",
+            "backend_id" = "${beId}",
+            "format" = "parquet")
+        order by id
+    """
+
+    order_qt_typed_only_nested_missing_field """
+        select id,
+               cast(v['nested']['missing'] as string) is null as 
missing_is_null
+        from local(
+            "file_path" = "${typedOnly}",
+            "backend_id" = "${beId}",
+            "format" = "parquet")
+        order by id
+    """
+
+    order_qt_temporal_parity """
+        select u.id,
+               cast(u.v['d'] as bigint) as unshredded_date,
+               cast(t.v['d'] as bigint) as typed_date,
+               cast(u.v['t'] as bigint) as unshredded_time,
+               cast(t.v['t'] as bigint) as typed_time,
+               cast(u.v['ts'] as bigint) as unshredded_ts,
+               cast(t.v['ts'] as bigint) as typed_ts,
+               cast(u.v['d'] as bigint) = cast(t.v['d'] as bigint) as 
same_date,
+               cast(u.v['t'] as bigint) = cast(t.v['t'] as bigint) as 
same_time,
+               cast(u.v['ts'] as bigint) = cast(t.v['ts'] as bigint) as same_ts
+        from local(
+            "file_path" = "${temporalUnshredded}",
+            "backend_id" = "${beId}",
+            "format" = "parquet") u
+        join local(
+            "file_path" = "${temporalTyped}",
+            "backend_id" = "${beId}",
+            "format" = "parquet") t
+          on u.id = t.id
+        order by u.id
+    """
+
+    def binaryUnshreddedRows = sql """
+        select id, hex(cast(v['b'] as varbinary))
+        from local(
+            "file_path" = "${binaryUnshredded}",
+            "backend_id" = "${beId}",
+            "format" = "parquet",
+            "enable_mapping_varbinary" = "true")
+        order by id
+    """
+    assertEquals(2, binaryUnshreddedRows.size())
+    assertEquals("1", binaryUnshreddedRows[0][0].toString())
+    assertEquals("FF0041", binaryUnshreddedRows[0][1].toString())
+    assertEquals("2", binaryUnshreddedRows[1][0].toString())
+    assertEquals("C328", binaryUnshreddedRows[1][1].toString())
+
+    def binaryTypedRows = sql """
+        select id, hex(cast(v['b'] as varbinary))
+        from local(
+            "file_path" = "${binaryTyped}",
+            "backend_id" = "${beId}",
+            "format" = "parquet",
+            "enable_mapping_varbinary" = "true")
+        order by id
+    """
+    assertEquals(2, binaryTypedRows.size())
+    assertEquals("1", binaryTypedRows[0][0].toString())
+    assertEquals("FF0041", binaryTypedRows[0][1].toString())
+    assertEquals("2", binaryTypedRows[1][0].toString())
+    assertEquals("C328", binaryTypedRows[1][1].toString())
+
+    sql """ set enable_profile = true """
+    sql """ set profile_level = 2 """

Review Comment:
   This suite enables profiling for many subsequent assertions but only 
disables it at the very end. If any assertion or query between here and the 
final `set enable_profile = false` fails, the regression session continues with 
profiling enabled, which can pollute later profile-based checks and add 
avoidable overhead. Please wrap the profile section in `try/finally` (or 
otherwise restore both profile settings reliably) so failures do not leak 
session state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to