morningman commented on code in PR #58972:
URL: https://github.com/apache/doris/pull/58972#discussion_r2625955283


##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function parquet_meta for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_meta (mode parquet_metadata): row-group/column statistics similar 
to DuckDB parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ * - parquet_file_metadata: file-level metadata aligned with DuckDB 
parquet_file_metadata()
+ * - parquet_kv_metadata: file key/value metadata aligned with DuckDB 
parquet_kv_metadata()
+ * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB 
parquet_bloom_probe()
+ */
+public class ParquetMetadataTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "parquet_meta";
+    public static final String NAME_FILE_METADATA = "parquet_file_metadata";
+    public static final String NAME_KV_METADATA = "parquet_kv_metadata";
+    public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final String PATH = "path";
+    private static final String MODE = "mode";
+    private static final String COLUMN = "column";
+    private static final String COLUMN_NAME = "column_name";
+    private static final String VALUE = "value";
+
+    private static final String MODE_METADATA = "parquet_metadata";
+    private static final String MODE_SCHEMA = "parquet_schema";
+    private static final String MODE_FILE_METADATA = "parquet_file_metadata";
+    private static final String MODE_KV_METADATA = "parquet_kv_metadata";
+    private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final ImmutableSet<String> SUPPORTED_MODES =
+            ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, 
MODE_KV_METADATA,
+                    MODE_BLOOM_PROBE);
+
+    private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("repetition_level", PrimitiveType.INT, true),
+            new Column("definition_level", PrimitiveType.INT, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("precision", PrimitiveType.INT, true),
+            new Column("scale", PrimitiveType.INT, true),
+            new Column("is_nullable", PrimitiveType.BOOLEAN, true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("column_id", PrimitiveType.INT, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("converted_type", PrimitiveType.STRING, true),
+            new Column("num_values", PrimitiveType.BIGINT, true),
+            new Column("null_count", PrimitiveType.BIGINT, true),
+            new Column("distinct_count", PrimitiveType.BIGINT, true),
+            new Column("encodings", PrimitiveType.STRING, true),
+            new Column("compression", PrimitiveType.STRING, true),
+            new Column("data_page_offset", PrimitiveType.BIGINT, true),
+            new Column("index_page_offset", PrimitiveType.BIGINT, true),
+            new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
+            new Column("total_compressed_size", PrimitiveType.BIGINT, true),
+            new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
+            new Column("statistics_min", ScalarType.createStringType(), true),
+            new Column("statistics_max", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("created_by", PrimitiveType.STRING, true),
+            new Column("num_rows", PrimitiveType.BIGINT, true),
+            new Column("num_row_groups", PrimitiveType.BIGINT, true),
+            new Column("format_version", PrimitiveType.BIGINT, true),
+            new Column("encryption_algorithm", PrimitiveType.STRING, true),
+            new Column("footer_signing_key_metadata", PrimitiveType.STRING, 
true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("key", ScalarType.createStringType(), true),
+            new Column("value", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true)
+    );
+
+    private final List<String> paths;
+    private final String mode;
+    // File system info for remote Parquet access (e.g. S3).
+    private final TFileType fileType;
+    private final Map<String, String> properties;
+    private final String bloomColumn;
+    private final String bloomLiteral;
+
+    public ParquetMetadataTableValuedFunction(Map<String, String> params) 
throws AnalysisException {
+        Map<String, String> normalizedParams = 
params.entrySet().stream().collect(Collectors.toMap(
+                entry -> entry.getKey().toLowerCase(),
+                Map.Entry::getValue,
+                (value1, value2) -> value2
+        ));
+        String rawPath = normalizedParams.get(PATH);
+        if (Strings.isNullOrEmpty(rawPath)) {
+            throw new AnalysisException("Property 'path' is required for 
parquet_meta");
+        }
+        String parsedPath = rawPath.trim();
+        if (parsedPath.isEmpty()) {
+            throw new AnalysisException("Property 'path' must contain at least 
one location");
+        }
+        List<String> parsedPaths = Collections.singletonList(parsedPath);

Review Comment:
   There is only one path, so no need to use List



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function parquet_meta for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_meta (mode parquet_metadata): row-group/column statistics similar 
to DuckDB parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ * - parquet_file_metadata: file-level metadata aligned with DuckDB 
parquet_file_metadata()
+ * - parquet_kv_metadata: file key/value metadata aligned with DuckDB 
parquet_kv_metadata()
+ * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB 
parquet_bloom_probe()
+ */
+public class ParquetMetadataTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "parquet_meta";
+    public static final String NAME_FILE_METADATA = "parquet_file_metadata";
+    public static final String NAME_KV_METADATA = "parquet_kv_metadata";
+    public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final String PATH = "path";
+    private static final String MODE = "mode";
+    private static final String COLUMN = "column";
+    private static final String COLUMN_NAME = "column_name";
+    private static final String VALUE = "value";
+
+    private static final String MODE_METADATA = "parquet_metadata";
+    private static final String MODE_SCHEMA = "parquet_schema";
+    private static final String MODE_FILE_METADATA = "parquet_file_metadata";
+    private static final String MODE_KV_METADATA = "parquet_kv_metadata";
+    private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final ImmutableSet<String> SUPPORTED_MODES =
+            ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, 
MODE_KV_METADATA,
+                    MODE_BLOOM_PROBE);
+
+    private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("repetition_level", PrimitiveType.INT, true),
+            new Column("definition_level", PrimitiveType.INT, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("precision", PrimitiveType.INT, true),
+            new Column("scale", PrimitiveType.INT, true),
+            new Column("is_nullable", PrimitiveType.BOOLEAN, true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("column_id", PrimitiveType.INT, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("converted_type", PrimitiveType.STRING, true),
+            new Column("num_values", PrimitiveType.BIGINT, true),
+            new Column("null_count", PrimitiveType.BIGINT, true),
+            new Column("distinct_count", PrimitiveType.BIGINT, true),
+            new Column("encodings", PrimitiveType.STRING, true),
+            new Column("compression", PrimitiveType.STRING, true),
+            new Column("data_page_offset", PrimitiveType.BIGINT, true),
+            new Column("index_page_offset", PrimitiveType.BIGINT, true),
+            new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
+            new Column("total_compressed_size", PrimitiveType.BIGINT, true),
+            new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
+            new Column("statistics_min", ScalarType.createStringType(), true),
+            new Column("statistics_max", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("created_by", PrimitiveType.STRING, true),
+            new Column("num_rows", PrimitiveType.BIGINT, true),
+            new Column("num_row_groups", PrimitiveType.BIGINT, true),
+            new Column("format_version", PrimitiveType.BIGINT, true),
+            new Column("encryption_algorithm", PrimitiveType.STRING, true),
+            new Column("footer_signing_key_metadata", PrimitiveType.STRING, 
true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("key", ScalarType.createStringType(), true),
+            new Column("value", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true)
+    );
+
+    private final List<String> paths;
+    private final String mode;
+    // File system info for remote Parquet access (e.g. S3).
+    private final TFileType fileType;
+    private final Map<String, String> properties;
+    private final String bloomColumn;
+    private final String bloomLiteral;
+
+    public ParquetMetadataTableValuedFunction(Map<String, String> params) 
throws AnalysisException {
+        Map<String, String> normalizedParams = 
params.entrySet().stream().collect(Collectors.toMap(
+                entry -> entry.getKey().toLowerCase(),
+                Map.Entry::getValue,
+                (value1, value2) -> value2
+        ));
+        String rawPath = normalizedParams.get(PATH);
+        if (Strings.isNullOrEmpty(rawPath)) {
+            throw new AnalysisException("Property 'path' is required for 
parquet_meta");
+        }
+        String parsedPath = rawPath.trim();
+        if (parsedPath.isEmpty()) {
+            throw new AnalysisException("Property 'path' must contain at least 
one location");
+        }
+        List<String> parsedPaths = Collections.singletonList(parsedPath);
+        List<String> normalizedPaths = parsedPaths;
+
+        String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA);
+        mode = rawMode.toLowerCase();
+        if (!SUPPORTED_MODES.contains(mode)) {
+            throw new AnalysisException("Unsupported mode '" + rawMode + "' 
for parquet_meta");
+        }
+        String tmpBloomColumn = null;
+        String tmpBloomLiteral = null;
+        if (MODE_BLOOM_PROBE.equals(mode)) {
+            tmpBloomColumn = normalizedParams.get(COLUMN);
+            if (Strings.isNullOrEmpty(tmpBloomColumn)) {
+                tmpBloomColumn = normalizedParams.get(COLUMN_NAME);
+            }
+            tmpBloomLiteral = normalizedParams.get(VALUE);
+            if (Strings.isNullOrEmpty(tmpBloomColumn) || 
Strings.isNullOrEmpty(tmpBloomLiteral)) {
+                throw new AnalysisException(
+                        "Missing 'column' or 'value' for mode 
parquet_bloom_probe");
+            }
+            tmpBloomColumn = tmpBloomColumn.trim();
+            tmpBloomLiteral = tmpBloomLiteral.trim();
+            if (tmpBloomColumn.isEmpty() || tmpBloomLiteral.isEmpty()) {
+                throw new AnalysisException(
+                        "Missing 'column' or 'value' for mode 
parquet_bloom_probe");
+            }
+        }
+
+        String firstPath = parsedPaths.get(0);
+        String scheme = null;
+        try {
+            scheme = new URI(firstPath).getScheme();
+        } catch (URISyntaxException e) {
+            scheme = null;
+        }
+        Map<String, String> storageParams = new HashMap<>(normalizedParams);
+        // StorageProperties detects provider by "uri", we also hint support 
by scheme.
+        if (!Strings.isNullOrEmpty(scheme)) {
+            storageParams.put("uri", firstPath);
+        }
+        if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) {
+            storageParams.put(StorageProperties.FS_LOCAL_SUPPORT, "true");
+        } else {
+            forceStorageSupport(storageParams, scheme.toLowerCase());
+        }
+        StorageProperties storageProperties;
+        try {
+            storageProperties = StorageProperties.createPrimary(storageParams);
+        } catch (RuntimeException e) {
+            throw new AnalysisException(
+                    "Failed to parse storage properties for parquet_meta: " + 
e.getMessage(), e);
+        }
+        this.fileType = mapToFileType(storageProperties.getType());
+        Map<String, String> backendProps = 
storageProperties.getBackendConfigProperties();
+        try {
+            List<String> tmpPaths = new ArrayList<>(parsedPaths.size());
+            for (String path : parsedPaths) {
+                tmpPaths.add(storageProperties.validateAndNormalizeUri(path));
+            }
+            normalizedPaths = tmpPaths;
+        } catch (UserException e) {
+            throw new AnalysisException(
+                    "Failed to normalize parquet_meta paths: " + 
e.getMessage(), e);
+        }
+        if (this.fileType == TFileType.FILE_HTTP && 
!backendProps.containsKey("uri")) {
+            backendProps = new HashMap<>(backendProps);
+            backendProps.put("uri", normalizedPaths.get(0));
+        }
+        this.properties = backendProps;
+
+        // Expand any glob patterns (e.g. *.parquet) to concrete file paths.
+        normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, 
storageParams, this.fileType);
+
+        this.paths = ImmutableList.copyOf(normalizedPaths);
+        this.bloomColumn = tmpBloomColumn;
+        this.bloomLiteral = tmpBloomLiteral;
+    }
+
+    /**
+     * Expand wildcard paths to matching files.
+     */
+    private static List<String> expandGlobPaths(List<String> inputPaths,
+                                                StorageProperties 
storageProperties,
+                                                Map<String, String> 
storageParams,
+                                                TFileType fileType) throws 
AnalysisException {
+        if (inputPaths == null || inputPaths.isEmpty()) {
+            return Collections.emptyList();
+        }
+        List<String> expanded = new ArrayList<>();
+        for (String path : inputPaths) {
+            if (!containsWildcards(path)) {
+                expanded.add(path);
+                continue;
+            }
+            expanded.addAll(expandSingleGlob(path, storageProperties, 
storageParams, fileType));
+        }
+        if (expanded.isEmpty()) {
+            throw new AnalysisException("No files matched parquet_meta path 
patterns: " + inputPaths);
+        }
+        return expanded;
+    }
+
+    private static boolean containsWildcards(String path) {
+        if (Strings.isNullOrEmpty(path)) {
+            return false;
+        }
+        return path.contains("*") || path.contains("[") || path.contains("{");
+    }
+
+    private static List<String> expandSingleGlob(String pattern,
+                                                 StorageProperties 
storageProperties,
+                                                 Map<String, String> 
storageParams,
+                                                 TFileType fileType) throws 
AnalysisException {
+        if (fileType == TFileType.FILE_LOCAL) {
+            return globLocal(pattern);
+        }
+        if (fileType == TFileType.FILE_HTTP) {
+            throw new AnalysisException("Glob patterns are not supported for 
file type: " + fileType);
+        }
+        if (storageProperties == null) {
+            throw new AnalysisException("Storage properties is required for 
glob pattern: " + pattern);
+        }
+        if (fileType == TFileType.FILE_S3 || fileType == TFileType.FILE_HDFS) {
+            return globRemoteWithBroker(pattern, storageParams);
+        }
+        throw new AnalysisException("Glob patterns are not supported for file 
type: " + fileType);
+    }
+
+    private static List<String> globRemoteWithBroker(String pattern,
+                                                     Map<String, String> 
storageParams) throws AnalysisException {
+        List<TBrokerFileStatus> remoteFiles = new ArrayList<>();
+        BrokerDesc brokerDesc = new BrokerDesc("ParquetMetaTvf", 
storageParams);
+        try {
+            BrokerUtil.parseFile(pattern, brokerDesc, remoteFiles);
+        } catch (UserException e) {
+            throw new AnalysisException("Failed to expand glob pattern '" + 
pattern + "': "
+                    + e.getMessage(), e);
+        }
+        return remoteFiles.stream()
+                .filter(file -> !file.isIsDir())
+                .map(TBrokerFileStatus::getPath)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Local glob expansion is executed on an arbitrary alive backend, because 
FE may not
+     * have access to BE local file system.
+     */
+    private static List<String> globLocal(String pattern) throws 
AnalysisException {
+        Backend backend;
+        try {
+            backend = 
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().stream()
+                    .filter(Backend::isAlive)
+                    .findFirst()
+                    .orElse(null);
+        } catch (AnalysisException e) {
+            throw e;
+        }
+        if (backend == null) {
+            throw new AnalysisException("No alive backends to expand local 
glob pattern");
+        }
+        BackendServiceProxy proxy = BackendServiceProxy.getInstance();
+        String patternForBe = pattern;
+        // BE safe_glob prepends user_files_secure_path (default 
${DORIS_HOME}); strip it to avoid double prefix.
+        List<String> securePrefixCandidates = new ArrayList<>();
+        String envSecure = System.getenv("USER_FILES_SECURE_PATH");
+        if (!Strings.isNullOrEmpty(envSecure)) {
+            securePrefixCandidates.add(envSecure);
+        }
+        String envDorisHome = System.getenv("DORIS_HOME");
+        if (!Strings.isNullOrEmpty(envDorisHome)) {
+            securePrefixCandidates.add(envDorisHome);
+        }
+        // 兜底:用路径所在目录作为前缀尝试剥离(针对在安全目录内的绝对路径)

Review Comment:
   English



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function parquet_meta for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_meta (mode parquet_metadata): row-group/column statistics similar 
to DuckDB parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ * - parquet_file_metadata: file-level metadata aligned with DuckDB 
parquet_file_metadata()
+ * - parquet_kv_metadata: file key/value metadata aligned with DuckDB 
parquet_kv_metadata()
+ * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB 
parquet_bloom_probe()
+ */
+public class ParquetMetadataTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "parquet_meta";
+    public static final String NAME_FILE_METADATA = "parquet_file_metadata";
+    public static final String NAME_KV_METADATA = "parquet_kv_metadata";
+    public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final String PATH = "path";
+    private static final String MODE = "mode";
+    private static final String COLUMN = "column";
+    private static final String COLUMN_NAME = "column_name";
+    private static final String VALUE = "value";
+
+    private static final String MODE_METADATA = "parquet_metadata";
+    private static final String MODE_SCHEMA = "parquet_schema";
+    private static final String MODE_FILE_METADATA = "parquet_file_metadata";
+    private static final String MODE_KV_METADATA = "parquet_kv_metadata";
+    private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final ImmutableSet<String> SUPPORTED_MODES =
+            ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, 
MODE_KV_METADATA,
+                    MODE_BLOOM_PROBE);
+
+    private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("repetition_level", PrimitiveType.INT, true),
+            new Column("definition_level", PrimitiveType.INT, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("precision", PrimitiveType.INT, true),
+            new Column("scale", PrimitiveType.INT, true),
+            new Column("is_nullable", PrimitiveType.BOOLEAN, true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("column_id", PrimitiveType.INT, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("converted_type", PrimitiveType.STRING, true),
+            new Column("num_values", PrimitiveType.BIGINT, true),
+            new Column("null_count", PrimitiveType.BIGINT, true),
+            new Column("distinct_count", PrimitiveType.BIGINT, true),
+            new Column("encodings", PrimitiveType.STRING, true),
+            new Column("compression", PrimitiveType.STRING, true),
+            new Column("data_page_offset", PrimitiveType.BIGINT, true),
+            new Column("index_page_offset", PrimitiveType.BIGINT, true),
+            new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
+            new Column("total_compressed_size", PrimitiveType.BIGINT, true),
+            new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
+            new Column("statistics_min", ScalarType.createStringType(), true),
+            new Column("statistics_max", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("created_by", PrimitiveType.STRING, true),
+            new Column("num_rows", PrimitiveType.BIGINT, true),
+            new Column("num_row_groups", PrimitiveType.BIGINT, true),
+            new Column("format_version", PrimitiveType.BIGINT, true),
+            new Column("encryption_algorithm", PrimitiveType.STRING, true),
+            new Column("footer_signing_key_metadata", PrimitiveType.STRING, 
true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("key", ScalarType.createStringType(), true),
+            new Column("value", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true)
+    );
+
+    private final List<String> paths;
+    private final String mode;
+    // File system info for remote Parquet access (e.g. S3).
+    private final TFileType fileType;
+    private final Map<String, String> properties;
+    private final String bloomColumn;
+    private final String bloomLiteral;
+
+    public ParquetMetadataTableValuedFunction(Map<String, String> params) 
throws AnalysisException {
+        Map<String, String> normalizedParams = 
params.entrySet().stream().collect(Collectors.toMap(
+                entry -> entry.getKey().toLowerCase(),
+                Map.Entry::getValue,
+                (value1, value2) -> value2
+        ));
+        String rawPath = normalizedParams.get(PATH);

Review Comment:
   Use `ExternalFileTableValuedFunction.URI_KEY` instead of `PATH`



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function parquet_meta for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_meta (mode parquet_metadata): row-group/column statistics similar 
to DuckDB parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ * - parquet_file_metadata: file-level metadata aligned with DuckDB 
parquet_file_metadata()
+ * - parquet_kv_metadata: file key/value metadata aligned with DuckDB 
parquet_kv_metadata()
+ * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB 
parquet_bloom_probe()
+ */
+public class ParquetMetadataTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "parquet_meta";
+    public static final String NAME_FILE_METADATA = "parquet_file_metadata";
+    public static final String NAME_KV_METADATA = "parquet_kv_metadata";
+    public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final String PATH = "path";
+    private static final String MODE = "mode";
+    private static final String COLUMN = "column";
+    private static final String COLUMN_NAME = "column_name";
+    private static final String VALUE = "value";
+
+    private static final String MODE_METADATA = "parquet_metadata";
+    private static final String MODE_SCHEMA = "parquet_schema";
+    private static final String MODE_FILE_METADATA = "parquet_file_metadata";
+    private static final String MODE_KV_METADATA = "parquet_kv_metadata";
+    private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final ImmutableSet<String> SUPPORTED_MODES =
+            ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, 
MODE_KV_METADATA,
+                    MODE_BLOOM_PROBE);
+
+    private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("repetition_level", PrimitiveType.INT, true),
+            new Column("definition_level", PrimitiveType.INT, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("precision", PrimitiveType.INT, true),
+            new Column("scale", PrimitiveType.INT, true),
+            new Column("is_nullable", PrimitiveType.BOOLEAN, true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("column_id", PrimitiveType.INT, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("converted_type", PrimitiveType.STRING, true),
+            new Column("num_values", PrimitiveType.BIGINT, true),
+            new Column("null_count", PrimitiveType.BIGINT, true),
+            new Column("distinct_count", PrimitiveType.BIGINT, true),
+            new Column("encodings", PrimitiveType.STRING, true),
+            new Column("compression", PrimitiveType.STRING, true),
+            new Column("data_page_offset", PrimitiveType.BIGINT, true),
+            new Column("index_page_offset", PrimitiveType.BIGINT, true),
+            new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
+            new Column("total_compressed_size", PrimitiveType.BIGINT, true),
+            new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
+            new Column("statistics_min", ScalarType.createStringType(), true),
+            new Column("statistics_max", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("created_by", PrimitiveType.STRING, true),
+            new Column("num_rows", PrimitiveType.BIGINT, true),
+            new Column("num_row_groups", PrimitiveType.BIGINT, true),
+            new Column("format_version", PrimitiveType.BIGINT, true),
+            new Column("encryption_algorithm", PrimitiveType.STRING, true),
+            new Column("footer_signing_key_metadata", PrimitiveType.STRING, 
true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("key", ScalarType.createStringType(), true),
+            new Column("value", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true)
+    );
+
+    private final List<String> paths;
+    private final String mode;
+    // File system info for remote Parquet access (e.g. S3).
+    private final TFileType fileType;
+    private final Map<String, String> properties;
+    private final String bloomColumn;
+    private final String bloomLiteral;
+
+    public ParquetMetadataTableValuedFunction(Map<String, String> params) 
throws AnalysisException {
+        Map<String, String> normalizedParams = 
params.entrySet().stream().collect(Collectors.toMap(
+                entry -> entry.getKey().toLowerCase(),
+                Map.Entry::getValue,
+                (value1, value2) -> value2
+        ));
+        String rawPath = normalizedParams.get(PATH);
+        if (Strings.isNullOrEmpty(rawPath)) {
+            throw new AnalysisException("Property 'path' is required for 
parquet_meta");
+        }
+        String parsedPath = rawPath.trim();
+        if (parsedPath.isEmpty()) {
+            throw new AnalysisException("Property 'path' must contain at least 
one location");
+        }
+        List<String> parsedPaths = Collections.singletonList(parsedPath);
+        List<String> normalizedPaths = parsedPaths;
+
+        String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA);
+        mode = rawMode.toLowerCase();
+        if (!SUPPORTED_MODES.contains(mode)) {
+            throw new AnalysisException("Unsupported mode '" + rawMode + "' 
for parquet_meta");
+        }
+        String tmpBloomColumn = null;
+        String tmpBloomLiteral = null;
+        if (MODE_BLOOM_PROBE.equals(mode)) {
+            tmpBloomColumn = normalizedParams.get(COLUMN);
+            if (Strings.isNullOrEmpty(tmpBloomColumn)) {
+                tmpBloomColumn = normalizedParams.get(COLUMN_NAME);
+            }
+            tmpBloomLiteral = normalizedParams.get(VALUE);
+            if (Strings.isNullOrEmpty(tmpBloomColumn) || 
Strings.isNullOrEmpty(tmpBloomLiteral)) {
+                throw new AnalysisException(
+                        "Missing 'column' or 'value' for mode 
parquet_bloom_probe");
+            }
+            tmpBloomColumn = tmpBloomColumn.trim();
+            tmpBloomLiteral = tmpBloomLiteral.trim();
+            if (tmpBloomColumn.isEmpty() || tmpBloomLiteral.isEmpty()) {
+                throw new AnalysisException(
+                        "Missing 'column' or 'value' for mode 
parquet_bloom_probe");
+            }
+        }
+
+        String firstPath = parsedPaths.get(0);
+        String scheme = null;
+        try {
+            scheme = new URI(firstPath).getScheme();
+        } catch (URISyntaxException e) {
+            scheme = null;
+        }
+        Map<String, String> storageParams = new HashMap<>(normalizedParams);
+        // StorageProperties detects provider by "uri", we also hint support 
by scheme.
+        if (!Strings.isNullOrEmpty(scheme)) {
+            storageParams.put("uri", firstPath);
+        }
+        if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) {
+            storageParams.put(StorageProperties.FS_LOCAL_SUPPORT, "true");
+        } else {
+            forceStorageSupport(storageParams, scheme.toLowerCase());

Review Comment:
   I don't think we need to force set `fs.xx.support`.
   `torageProperties.createPrimary` can find the real storage type



##########
be/src/vec/exec/format/parquet/parquet_predicate.h:
##########
@@ -177,13 +177,16 @@ class ParquetPredicate {
         // TINYINT/SMALLINT also need conversion via 
LittleIntPhysicalConverter.
         switch (type) {
         case TYPE_BOOLEAN:
+        case TYPE_TINYINT:

Review Comment:
   why adding these new types?



##########
cloud/src/main.cpp:
##########
@@ -136,7 +136,7 @@ static void help() {
 
 static std::string build_info() {
     std::stringstream ss;
-// clang-format off
+    // clang-format off
 #if defined(NDEBUG)

Review Comment:
   do not change this



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function parquet_meta for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_meta (mode parquet_metadata): row-group/column statistics similar 
to DuckDB parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ * - parquet_file_metadata: file-level metadata aligned with DuckDB 
parquet_file_metadata()
+ * - parquet_kv_metadata: file key/value metadata aligned with DuckDB 
parquet_kv_metadata()
+ * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB 
parquet_bloom_probe()
+ */
+public class ParquetMetadataTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "parquet_meta";
+    public static final String NAME_FILE_METADATA = "parquet_file_metadata";
+    public static final String NAME_KV_METADATA = "parquet_kv_metadata";
+    public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final String PATH = "path";
+    private static final String MODE = "mode";
+    private static final String COLUMN = "column";
+    private static final String COLUMN_NAME = "column_name";
+    private static final String VALUE = "value";
+
+    private static final String MODE_METADATA = "parquet_metadata";
+    private static final String MODE_SCHEMA = "parquet_schema";
+    private static final String MODE_FILE_METADATA = "parquet_file_metadata";
+    private static final String MODE_KV_METADATA = "parquet_kv_metadata";
+    private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final ImmutableSet<String> SUPPORTED_MODES =
+            ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, 
MODE_KV_METADATA,
+                    MODE_BLOOM_PROBE);
+
+    private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("repetition_level", PrimitiveType.INT, true),
+            new Column("definition_level", PrimitiveType.INT, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("precision", PrimitiveType.INT, true),
+            new Column("scale", PrimitiveType.INT, true),
+            new Column("is_nullable", PrimitiveType.BOOLEAN, true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("column_id", PrimitiveType.INT, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("converted_type", PrimitiveType.STRING, true),
+            new Column("num_values", PrimitiveType.BIGINT, true),
+            new Column("null_count", PrimitiveType.BIGINT, true),
+            new Column("distinct_count", PrimitiveType.BIGINT, true),
+            new Column("encodings", PrimitiveType.STRING, true),
+            new Column("compression", PrimitiveType.STRING, true),
+            new Column("data_page_offset", PrimitiveType.BIGINT, true),
+            new Column("index_page_offset", PrimitiveType.BIGINT, true),
+            new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
+            new Column("total_compressed_size", PrimitiveType.BIGINT, true),
+            new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
+            new Column("statistics_min", ScalarType.createStringType(), true),
+            new Column("statistics_max", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("created_by", PrimitiveType.STRING, true),
+            new Column("num_rows", PrimitiveType.BIGINT, true),
+            new Column("num_row_groups", PrimitiveType.BIGINT, true),
+            new Column("format_version", PrimitiveType.BIGINT, true),
+            new Column("encryption_algorithm", PrimitiveType.STRING, true),
+            new Column("footer_signing_key_metadata", PrimitiveType.STRING, 
true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("key", ScalarType.createStringType(), true),
+            new Column("value", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true)
+    );
+
+    private final List<String> paths;
+    private final String mode;
+    // File system info for remote Parquet access (e.g. S3).
+    private final TFileType fileType;
+    private final Map<String, String> properties;
+    private final String bloomColumn;
+    private final String bloomLiteral;
+
+    public ParquetMetadataTableValuedFunction(Map<String, String> params) 
throws AnalysisException {
+        Map<String, String> normalizedParams = 
params.entrySet().stream().collect(Collectors.toMap(
+                entry -> entry.getKey().toLowerCase(),
+                Map.Entry::getValue,
+                (value1, value2) -> value2
+        ));
+        String rawPath = normalizedParams.get(PATH);
+        if (Strings.isNullOrEmpty(rawPath)) {
+            throw new AnalysisException("Property 'path' is required for 
parquet_meta");
+        }
+        String parsedPath = rawPath.trim();
+        if (parsedPath.isEmpty()) {
+            throw new AnalysisException("Property 'path' must contain at least 
one location");
+        }
+        List<String> parsedPaths = Collections.singletonList(parsedPath);
+        List<String> normalizedPaths = parsedPaths;
+
+        String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA);
+        mode = rawMode.toLowerCase();
+        if (!SUPPORTED_MODES.contains(mode)) {
+            throw new AnalysisException("Unsupported mode '" + rawMode + "' 
for parquet_meta");
+        }
+        String tmpBloomColumn = null;
+        String tmpBloomLiteral = null;
+        if (MODE_BLOOM_PROBE.equals(mode)) {
+            tmpBloomColumn = normalizedParams.get(COLUMN);
+            if (Strings.isNullOrEmpty(tmpBloomColumn)) {
+                tmpBloomColumn = normalizedParams.get(COLUMN_NAME);
+            }
+            tmpBloomLiteral = normalizedParams.get(VALUE);
+            if (Strings.isNullOrEmpty(tmpBloomColumn) || 
Strings.isNullOrEmpty(tmpBloomLiteral)) {
+                throw new AnalysisException(
+                        "Missing 'column' or 'value' for mode 
parquet_bloom_probe");
+            }
+            tmpBloomColumn = tmpBloomColumn.trim();
+            tmpBloomLiteral = tmpBloomLiteral.trim();
+            if (tmpBloomColumn.isEmpty() || tmpBloomLiteral.isEmpty()) {
+                throw new AnalysisException(
+                        "Missing 'column' or 'value' for mode 
parquet_bloom_probe");
+            }
+        }
+
+        String firstPath = parsedPaths.get(0);
+        String scheme = null;
+        try {
+            scheme = new URI(firstPath).getScheme();
+        } catch (URISyntaxException e) {
+            scheme = null;
+        }
+        Map<String, String> storageParams = new HashMap<>(normalizedParams);
+        // StorageProperties detects provider by "uri", we also hint support 
by scheme.
+        if (!Strings.isNullOrEmpty(scheme)) {
+            storageParams.put("uri", firstPath);
+        }
+        if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) {
+            storageParams.put(StorageProperties.FS_LOCAL_SUPPORT, "true");
+        } else {
+            forceStorageSupport(storageParams, scheme.toLowerCase());
+        }
+        StorageProperties storageProperties;
+        try {
+            storageProperties = StorageProperties.createPrimary(storageParams);
+        } catch (RuntimeException e) {
+            throw new AnalysisException(
+                    "Failed to parse storage properties for parquet_meta: " + 
e.getMessage(), e);
+        }
+        this.fileType = mapToFileType(storageProperties.getType());
+        Map<String, String> backendProps = 
storageProperties.getBackendConfigProperties();
+        try {
+            List<String> tmpPaths = new ArrayList<>(parsedPaths.size());
+            for (String path : parsedPaths) {
+                tmpPaths.add(storageProperties.validateAndNormalizeUri(path));
+            }
+            normalizedPaths = tmpPaths;
+        } catch (UserException e) {
+            throw new AnalysisException(
+                    "Failed to normalize parquet_meta paths: " + 
e.getMessage(), e);
+        }
+        if (this.fileType == TFileType.FILE_HTTP && 
!backendProps.containsKey("uri")) {
+            backendProps = new HashMap<>(backendProps);
+            backendProps.put("uri", normalizedPaths.get(0));
+        }
+        this.properties = backendProps;
+
+        // Expand any glob patterns (e.g. *.parquet) to concrete file paths.
+        normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, 
storageParams, this.fileType);
+
+        this.paths = ImmutableList.copyOf(normalizedPaths);
+        this.bloomColumn = tmpBloomColumn;
+        this.bloomLiteral = tmpBloomLiteral;
+    }
+
+    /**
+     * Expand wildcard paths to matching files.
+     */
+    private static List<String> expandGlobPaths(List<String> inputPaths,
+                                                StorageProperties 
storageProperties,
+                                                Map<String, String> 
storageParams,
+                                                TFileType fileType) throws 
AnalysisException {
+        if (inputPaths == null || inputPaths.isEmpty()) {
+            return Collections.emptyList();
+        }
+        List<String> expanded = new ArrayList<>();
+        for (String path : inputPaths) {
+            if (!containsWildcards(path)) {
+                expanded.add(path);
+                continue;
+            }
+            expanded.addAll(expandSingleGlob(path, storageProperties, 
storageParams, fileType));
+        }
+        if (expanded.isEmpty()) {
+            throw new AnalysisException("No files matched parquet_meta path 
patterns: " + inputPaths);
+        }
+        return expanded;
+    }
+
+    private static boolean containsWildcards(String path) {
+        if (Strings.isNullOrEmpty(path)) {
+            return false;
+        }
+        return path.contains("*") || path.contains("[") || path.contains("{");
+    }
+
+    private static List<String> expandSingleGlob(String pattern,
+                                                 StorageProperties 
storageProperties,
+                                                 Map<String, String> 
storageParams,
+                                                 TFileType fileType) throws 
AnalysisException {
+        if (fileType == TFileType.FILE_LOCAL) {
+            return globLocal(pattern);

Review Comment:
   Why not reuse `LocalTableValuedFunction.getFileListFromBackend()` to get 
file list?



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java:
##########
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TParquetMetadataParams;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function parquet_meta for reading Parquet metadata.
+ * Currently works in two modes:
+ * - parquet_meta (mode parquet_metadata): row-group/column statistics similar 
to DuckDB parquet_metadata()
+ * - parquet_schema: logical schema similar to DuckDB parquet_schema()
+ * - parquet_file_metadata: file-level metadata aligned with DuckDB 
parquet_file_metadata()
+ * - parquet_kv_metadata: file key/value metadata aligned with DuckDB 
parquet_kv_metadata()
+ * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB 
parquet_bloom_probe()
+ */
+public class ParquetMetadataTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "parquet_meta";
+    public static final String NAME_FILE_METADATA = "parquet_file_metadata";
+    public static final String NAME_KV_METADATA = "parquet_kv_metadata";
+    public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final String PATH = "path";
+    private static final String MODE = "mode";
+    private static final String COLUMN = "column";
+    private static final String COLUMN_NAME = "column_name";
+    private static final String VALUE = "value";
+
+    private static final String MODE_METADATA = "parquet_metadata";
+    private static final String MODE_SCHEMA = "parquet_schema";
+    private static final String MODE_FILE_METADATA = "parquet_file_metadata";
+    private static final String MODE_KV_METADATA = "parquet_kv_metadata";
+    private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe";
+    private static final ImmutableSet<String> SUPPORTED_MODES =
+            ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, 
MODE_KV_METADATA,
+                    MODE_BLOOM_PROBE);
+
+    private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("repetition_level", PrimitiveType.INT, true),
+            new Column("definition_level", PrimitiveType.INT, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("precision", PrimitiveType.INT, true),
+            new Column("scale", PrimitiveType.INT, true),
+            new Column("is_nullable", PrimitiveType.BOOLEAN, true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("column_id", PrimitiveType.INT, true),
+            new Column("column_name", PrimitiveType.STRING, true),
+            new Column("column_path", PrimitiveType.STRING, true),
+            new Column("physical_type", PrimitiveType.STRING, true),
+            new Column("logical_type", PrimitiveType.STRING, true),
+            new Column("type_length", PrimitiveType.INT, true),
+            new Column("converted_type", PrimitiveType.STRING, true),
+            new Column("num_values", PrimitiveType.BIGINT, true),
+            new Column("null_count", PrimitiveType.BIGINT, true),
+            new Column("distinct_count", PrimitiveType.BIGINT, true),
+            new Column("encodings", PrimitiveType.STRING, true),
+            new Column("compression", PrimitiveType.STRING, true),
+            new Column("data_page_offset", PrimitiveType.BIGINT, true),
+            new Column("index_page_offset", PrimitiveType.BIGINT, true),
+            new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
+            new Column("total_compressed_size", PrimitiveType.BIGINT, true),
+            new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
+            new Column("statistics_min", ScalarType.createStringType(), true),
+            new Column("statistics_max", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("created_by", PrimitiveType.STRING, true),
+            new Column("num_rows", PrimitiveType.BIGINT, true),
+            new Column("num_row_groups", PrimitiveType.BIGINT, true),
+            new Column("format_version", PrimitiveType.BIGINT, true),
+            new Column("encryption_algorithm", PrimitiveType.STRING, true),
+            new Column("footer_signing_key_metadata", PrimitiveType.STRING, 
true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("key", ScalarType.createStringType(), true),
+            new Column("value", ScalarType.createStringType(), true)
+    );
+
+    private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = 
ImmutableList.of(
+            new Column("file_name", PrimitiveType.STRING, true),
+            new Column("row_group_id", PrimitiveType.INT, true),
+            new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true)
+    );
+
+    private final List<String> paths;
+    private final String mode;
+    // File system info for remote Parquet access (e.g. S3).
+    private final TFileType fileType;
+    private final Map<String, String> properties;
+    private final String bloomColumn;
+    private final String bloomLiteral;
+
+    public ParquetMetadataTableValuedFunction(Map<String, String> params) 
throws AnalysisException {
+        Map<String, String> normalizedParams = 
params.entrySet().stream().collect(Collectors.toMap(
+                entry -> entry.getKey().toLowerCase(),
+                Map.Entry::getValue,
+                (value1, value2) -> value2
+        ));
+        String rawPath = normalizedParams.get(PATH);
+        if (Strings.isNullOrEmpty(rawPath)) {
+            throw new AnalysisException("Property 'path' is required for 
parquet_meta");
+        }
+        String parsedPath = rawPath.trim();
+        if (parsedPath.isEmpty()) {
+            throw new AnalysisException("Property 'path' must contain at least 
one location");
+        }
+        List<String> parsedPaths = Collections.singletonList(parsedPath);
+        List<String> normalizedPaths = parsedPaths;
+
+        String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA);
+        mode = rawMode.toLowerCase();
+        if (!SUPPORTED_MODES.contains(mode)) {
+            throw new AnalysisException("Unsupported mode '" + rawMode + "' 
for parquet_meta");
+        }
+        String tmpBloomColumn = null;
+        String tmpBloomLiteral = null;
+        if (MODE_BLOOM_PROBE.equals(mode)) {
+            tmpBloomColumn = normalizedParams.get(COLUMN);

Review Comment:
   what is different between COLUMN and COLUMN_NAME?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to