morningman commented on code in PR #58972: URL: https://github.com/apache/doris/pull/58972#discussion_r2625955283
########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,474 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function parquet_meta for reading Parquet metadata. + * Currently works in two modes: + * - parquet_meta (mode parquet_metadata): row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + * - parquet_file_metadata: file-level metadata aligned with DuckDB parquet_file_metadata() + * - parquet_kv_metadata: file key/value metadata aligned with DuckDB parquet_kv_metadata() + * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB parquet_bloom_probe() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + public static final String NAME_FILE_METADATA = "parquet_file_metadata"; + public static final String NAME_KV_METADATA = "parquet_kv_metadata"; + public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe"; + private static final String PATH = "path"; + private static final String MODE = "mode"; + private static final String COLUMN = "column"; + private static final String COLUMN_NAME = "column_name"; + private static final String VALUE = "value"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final String MODE_FILE_METADATA = "parquet_file_metadata"; + private static final String MODE_KV_METADATA = "parquet_kv_metadata"; + private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe"; + private static final ImmutableSet<String> SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, MODE_KV_METADATA, + MODE_BLOOM_PROBE); + + private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("repetition_level", PrimitiveType.INT, true), + new Column("definition_level", PrimitiveType.INT, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("precision", PrimitiveType.INT, true), + new Column("scale", PrimitiveType.INT, true), + new Column("is_nullable", PrimitiveType.BOOLEAN, true) + ); + + private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("column_id", PrimitiveType.INT, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("converted_type", PrimitiveType.STRING, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("null_count", PrimitiveType.BIGINT, true), + new Column("distinct_count", PrimitiveType.BIGINT, true), + new Column("encodings", PrimitiveType.STRING, true), + new Column("compression", PrimitiveType.STRING, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("statistics_min", ScalarType.createStringType(), true), + new Column("statistics_max", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("created_by", PrimitiveType.STRING, true), + new Column("num_rows", PrimitiveType.BIGINT, true), + new Column("num_row_groups", PrimitiveType.BIGINT, true), + new Column("format_version", PrimitiveType.BIGINT, true), + new Column("encryption_algorithm", PrimitiveType.STRING, true), + new Column("footer_signing_key_metadata", PrimitiveType.STRING, true) + ); + + private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("key", ScalarType.createStringType(), true), + new Column("value", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true) + ); + + private final List<String> paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map<String, String> properties; + private final String bloomColumn; + private final String bloomLiteral; + + public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawPath = normalizedParams.get(PATH); + if (Strings.isNullOrEmpty(rawPath)) { + throw new AnalysisException("Property 'path' is required for parquet_meta"); + } + String parsedPath = rawPath.trim(); + if (parsedPath.isEmpty()) { + throw new AnalysisException("Property 'path' must contain at least one location"); + } + List<String> parsedPaths = Collections.singletonList(parsedPath); Review Comment: There is only one path, so no need to use List ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,474 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function parquet_meta for reading Parquet metadata. + * Currently works in two modes: + * - parquet_meta (mode parquet_metadata): row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + * - parquet_file_metadata: file-level metadata aligned with DuckDB parquet_file_metadata() + * - parquet_kv_metadata: file key/value metadata aligned with DuckDB parquet_kv_metadata() + * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB parquet_bloom_probe() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + public static final String NAME_FILE_METADATA = "parquet_file_metadata"; + public static final String NAME_KV_METADATA = "parquet_kv_metadata"; + public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe"; + private static final String PATH = "path"; + private static final String MODE = "mode"; + private static final String COLUMN = "column"; + private static final String COLUMN_NAME = "column_name"; + private static final String VALUE = "value"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final String MODE_FILE_METADATA = "parquet_file_metadata"; + private static final String MODE_KV_METADATA = "parquet_kv_metadata"; + private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe"; + private static final ImmutableSet<String> SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, MODE_KV_METADATA, + MODE_BLOOM_PROBE); + + private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("repetition_level", PrimitiveType.INT, true), + new Column("definition_level", PrimitiveType.INT, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("precision", PrimitiveType.INT, true), + new Column("scale", PrimitiveType.INT, true), + new Column("is_nullable", PrimitiveType.BOOLEAN, true) + ); + + private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("column_id", PrimitiveType.INT, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("converted_type", PrimitiveType.STRING, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("null_count", PrimitiveType.BIGINT, true), + new Column("distinct_count", PrimitiveType.BIGINT, true), + new Column("encodings", PrimitiveType.STRING, true), + new Column("compression", PrimitiveType.STRING, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("statistics_min", ScalarType.createStringType(), true), + new Column("statistics_max", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("created_by", PrimitiveType.STRING, true), + new Column("num_rows", PrimitiveType.BIGINT, true), + new Column("num_row_groups", PrimitiveType.BIGINT, true), + new Column("format_version", PrimitiveType.BIGINT, true), + new Column("encryption_algorithm", PrimitiveType.STRING, true), + new Column("footer_signing_key_metadata", PrimitiveType.STRING, true) + ); + + private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("key", ScalarType.createStringType(), true), + new Column("value", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true) + ); + + private final List<String> paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map<String, String> properties; + private final String bloomColumn; + private final String bloomLiteral; + + public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawPath = normalizedParams.get(PATH); + if (Strings.isNullOrEmpty(rawPath)) { + throw new AnalysisException("Property 'path' is required for parquet_meta"); + } + String parsedPath = rawPath.trim(); + if (parsedPath.isEmpty()) { + throw new AnalysisException("Property 'path' must contain at least one location"); + } + List<String> parsedPaths = Collections.singletonList(parsedPath); + List<String> normalizedPaths = parsedPaths; + + String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA); + mode = rawMode.toLowerCase(); + if (!SUPPORTED_MODES.contains(mode)) { + throw new AnalysisException("Unsupported mode '" + rawMode + "' for parquet_meta"); + } + String tmpBloomColumn = null; + String tmpBloomLiteral = null; + if (MODE_BLOOM_PROBE.equals(mode)) { + tmpBloomColumn = normalizedParams.get(COLUMN); + if (Strings.isNullOrEmpty(tmpBloomColumn)) { + tmpBloomColumn = normalizedParams.get(COLUMN_NAME); + } + tmpBloomLiteral = normalizedParams.get(VALUE); + if (Strings.isNullOrEmpty(tmpBloomColumn) || Strings.isNullOrEmpty(tmpBloomLiteral)) { + throw new AnalysisException( + "Missing 'column' or 'value' for mode parquet_bloom_probe"); + } + tmpBloomColumn = tmpBloomColumn.trim(); + tmpBloomLiteral = tmpBloomLiteral.trim(); + if (tmpBloomColumn.isEmpty() || tmpBloomLiteral.isEmpty()) { + throw new AnalysisException( + "Missing 'column' or 'value' for mode parquet_bloom_probe"); + } + } + + String firstPath = parsedPaths.get(0); + String scheme = null; + try { + scheme = new URI(firstPath).getScheme(); + } catch (URISyntaxException e) { + scheme = null; + } + Map<String, String> storageParams = new HashMap<>(normalizedParams); + // StorageProperties detects provider by "uri", we also hint support by scheme. + if (!Strings.isNullOrEmpty(scheme)) { + storageParams.put("uri", firstPath); + } + if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) { + storageParams.put(StorageProperties.FS_LOCAL_SUPPORT, "true"); + } else { + forceStorageSupport(storageParams, scheme.toLowerCase()); + } + StorageProperties storageProperties; + try { + storageProperties = StorageProperties.createPrimary(storageParams); + } catch (RuntimeException e) { + throw new AnalysisException( + "Failed to parse storage properties for parquet_meta: " + e.getMessage(), e); + } + this.fileType = mapToFileType(storageProperties.getType()); + Map<String, String> backendProps = storageProperties.getBackendConfigProperties(); + try { + List<String> tmpPaths = new ArrayList<>(parsedPaths.size()); + for (String path : parsedPaths) { + tmpPaths.add(storageProperties.validateAndNormalizeUri(path)); + } + normalizedPaths = tmpPaths; + } catch (UserException e) { + throw new AnalysisException( + "Failed to normalize parquet_meta paths: " + e.getMessage(), e); + } + if (this.fileType == TFileType.FILE_HTTP && !backendProps.containsKey("uri")) { + backendProps = new HashMap<>(backendProps); + backendProps.put("uri", normalizedPaths.get(0)); + } + this.properties = backendProps; + + // Expand any glob patterns (e.g. *.parquet) to concrete file paths. + normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, storageParams, this.fileType); + + this.paths = ImmutableList.copyOf(normalizedPaths); + this.bloomColumn = tmpBloomColumn; + this.bloomLiteral = tmpBloomLiteral; + } + + /** + * Expand wildcard paths to matching files. + */ + private static List<String> expandGlobPaths(List<String> inputPaths, + StorageProperties storageProperties, + Map<String, String> storageParams, + TFileType fileType) throws AnalysisException { + if (inputPaths == null || inputPaths.isEmpty()) { + return Collections.emptyList(); + } + List<String> expanded = new ArrayList<>(); + for (String path : inputPaths) { + if (!containsWildcards(path)) { + expanded.add(path); + continue; + } + expanded.addAll(expandSingleGlob(path, storageProperties, storageParams, fileType)); + } + if (expanded.isEmpty()) { + throw new AnalysisException("No files matched parquet_meta path patterns: " + inputPaths); + } + return expanded; + } + + private static boolean containsWildcards(String path) { + if (Strings.isNullOrEmpty(path)) { + return false; + } + return path.contains("*") || path.contains("[") || path.contains("{"); + } + + private static List<String> expandSingleGlob(String pattern, + StorageProperties storageProperties, + Map<String, String> storageParams, + TFileType fileType) throws AnalysisException { + if (fileType == TFileType.FILE_LOCAL) { + return globLocal(pattern); + } + if (fileType == TFileType.FILE_HTTP) { + throw new AnalysisException("Glob patterns are not supported for file type: " + fileType); + } + if (storageProperties == null) { + throw new AnalysisException("Storage properties is required for glob pattern: " + pattern); + } + if (fileType == TFileType.FILE_S3 || fileType == TFileType.FILE_HDFS) { + return globRemoteWithBroker(pattern, storageParams); + } + throw new AnalysisException("Glob patterns are not supported for file type: " + fileType); + } + + private static List<String> globRemoteWithBroker(String pattern, + Map<String, String> storageParams) throws AnalysisException { + List<TBrokerFileStatus> remoteFiles = new ArrayList<>(); + BrokerDesc brokerDesc = new BrokerDesc("ParquetMetaTvf", storageParams); + try { + BrokerUtil.parseFile(pattern, brokerDesc, remoteFiles); + } catch (UserException e) { + throw new AnalysisException("Failed to expand glob pattern '" + pattern + "': " + + e.getMessage(), e); + } + return remoteFiles.stream() + .filter(file -> !file.isIsDir()) + .map(TBrokerFileStatus::getPath) + .collect(Collectors.toList()); + } + + /** + * Local glob expansion is executed on an arbitrary alive backend, because FE may not + * have access to BE local file system. + */ + private static List<String> globLocal(String pattern) throws AnalysisException { + Backend backend; + try { + backend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().stream() + .filter(Backend::isAlive) + .findFirst() + .orElse(null); + } catch (AnalysisException e) { + throw e; + } + if (backend == null) { + throw new AnalysisException("No alive backends to expand local glob pattern"); + } + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); + String patternForBe = pattern; + // BE safe_glob prepends user_files_secure_path (default ${DORIS_HOME}); strip it to avoid double prefix. + List<String> securePrefixCandidates = new ArrayList<>(); + String envSecure = System.getenv("USER_FILES_SECURE_PATH"); + if (!Strings.isNullOrEmpty(envSecure)) { + securePrefixCandidates.add(envSecure); + } + String envDorisHome = System.getenv("DORIS_HOME"); + if (!Strings.isNullOrEmpty(envDorisHome)) { + securePrefixCandidates.add(envDorisHome); + } + // 兜底:用路径所在目录作为前缀尝试剥离(针对在安全目录内的绝对路径) Review Comment: English ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,474 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function parquet_meta for reading Parquet metadata. + * Currently works in two modes: + * - parquet_meta (mode parquet_metadata): row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + * - parquet_file_metadata: file-level metadata aligned with DuckDB parquet_file_metadata() + * - parquet_kv_metadata: file key/value metadata aligned with DuckDB parquet_kv_metadata() + * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB parquet_bloom_probe() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + public static final String NAME_FILE_METADATA = "parquet_file_metadata"; + public static final String NAME_KV_METADATA = "parquet_kv_metadata"; + public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe"; + private static final String PATH = "path"; + private static final String MODE = "mode"; + private static final String COLUMN = "column"; + private static final String COLUMN_NAME = "column_name"; + private static final String VALUE = "value"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final String MODE_FILE_METADATA = "parquet_file_metadata"; + private static final String MODE_KV_METADATA = "parquet_kv_metadata"; + private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe"; + private static final ImmutableSet<String> SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, MODE_KV_METADATA, + MODE_BLOOM_PROBE); + + private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("repetition_level", PrimitiveType.INT, true), + new Column("definition_level", PrimitiveType.INT, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("precision", PrimitiveType.INT, true), + new Column("scale", PrimitiveType.INT, true), + new Column("is_nullable", PrimitiveType.BOOLEAN, true) + ); + + private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("column_id", PrimitiveType.INT, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("converted_type", PrimitiveType.STRING, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("null_count", PrimitiveType.BIGINT, true), + new Column("distinct_count", PrimitiveType.BIGINT, true), + new Column("encodings", PrimitiveType.STRING, true), + new Column("compression", PrimitiveType.STRING, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("statistics_min", ScalarType.createStringType(), true), + new Column("statistics_max", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("created_by", PrimitiveType.STRING, true), + new Column("num_rows", PrimitiveType.BIGINT, true), + new Column("num_row_groups", PrimitiveType.BIGINT, true), + new Column("format_version", PrimitiveType.BIGINT, true), + new Column("encryption_algorithm", PrimitiveType.STRING, true), + new Column("footer_signing_key_metadata", PrimitiveType.STRING, true) + ); + + private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("key", ScalarType.createStringType(), true), + new Column("value", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true) + ); + + private final List<String> paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map<String, String> properties; + private final String bloomColumn; + private final String bloomLiteral; + + public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawPath = normalizedParams.get(PATH); Review Comment: Use `ExternalFileTableValuedFunction.URI_KEY` instead of `PATH` ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,474 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function parquet_meta for reading Parquet metadata. + * Currently works in two modes: + * - parquet_meta (mode parquet_metadata): row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + * - parquet_file_metadata: file-level metadata aligned with DuckDB parquet_file_metadata() + * - parquet_kv_metadata: file key/value metadata aligned with DuckDB parquet_kv_metadata() + * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB parquet_bloom_probe() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + public static final String NAME_FILE_METADATA = "parquet_file_metadata"; + public static final String NAME_KV_METADATA = "parquet_kv_metadata"; + public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe"; + private static final String PATH = "path"; + private static final String MODE = "mode"; + private static final String COLUMN = "column"; + private static final String COLUMN_NAME = "column_name"; + private static final String VALUE = "value"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final String MODE_FILE_METADATA = "parquet_file_metadata"; + private static final String MODE_KV_METADATA = "parquet_kv_metadata"; + private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe"; + private static final ImmutableSet<String> SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, MODE_KV_METADATA, + MODE_BLOOM_PROBE); + + private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("repetition_level", PrimitiveType.INT, true), + new Column("definition_level", PrimitiveType.INT, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("precision", PrimitiveType.INT, true), + new Column("scale", PrimitiveType.INT, true), + new Column("is_nullable", PrimitiveType.BOOLEAN, true) + ); + + private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("column_id", PrimitiveType.INT, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("converted_type", PrimitiveType.STRING, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("null_count", PrimitiveType.BIGINT, true), + new Column("distinct_count", PrimitiveType.BIGINT, true), + new Column("encodings", PrimitiveType.STRING, true), + new Column("compression", PrimitiveType.STRING, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("statistics_min", ScalarType.createStringType(), true), + new Column("statistics_max", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("created_by", PrimitiveType.STRING, true), + new Column("num_rows", PrimitiveType.BIGINT, true), + new Column("num_row_groups", PrimitiveType.BIGINT, true), + new Column("format_version", PrimitiveType.BIGINT, true), + new Column("encryption_algorithm", PrimitiveType.STRING, true), + new Column("footer_signing_key_metadata", PrimitiveType.STRING, true) + ); + + private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("key", ScalarType.createStringType(), true), + new Column("value", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true) + ); + + private final List<String> paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map<String, String> properties; + private final String bloomColumn; + private final String bloomLiteral; + + public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawPath = normalizedParams.get(PATH); + if (Strings.isNullOrEmpty(rawPath)) { + throw new AnalysisException("Property 'path' is required for parquet_meta"); + } + String parsedPath = rawPath.trim(); + if (parsedPath.isEmpty()) { + throw new AnalysisException("Property 'path' must contain at least one location"); + } + List<String> parsedPaths = Collections.singletonList(parsedPath); + List<String> normalizedPaths = parsedPaths; + + String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA); + mode = rawMode.toLowerCase(); + if (!SUPPORTED_MODES.contains(mode)) { + throw new AnalysisException("Unsupported mode '" + rawMode + "' for parquet_meta"); + } + String tmpBloomColumn = null; + String tmpBloomLiteral = null; + if (MODE_BLOOM_PROBE.equals(mode)) { + tmpBloomColumn = normalizedParams.get(COLUMN); + if (Strings.isNullOrEmpty(tmpBloomColumn)) { + tmpBloomColumn = normalizedParams.get(COLUMN_NAME); + } + tmpBloomLiteral = normalizedParams.get(VALUE); + if (Strings.isNullOrEmpty(tmpBloomColumn) || Strings.isNullOrEmpty(tmpBloomLiteral)) { + throw new AnalysisException( + "Missing 'column' or 'value' for mode parquet_bloom_probe"); + } + tmpBloomColumn = tmpBloomColumn.trim(); + tmpBloomLiteral = tmpBloomLiteral.trim(); + if (tmpBloomColumn.isEmpty() || tmpBloomLiteral.isEmpty()) { + throw new AnalysisException( + "Missing 'column' or 'value' for mode parquet_bloom_probe"); + } + } + + String firstPath = parsedPaths.get(0); + String scheme = null; + try { + scheme = new URI(firstPath).getScheme(); + } catch (URISyntaxException e) { + scheme = null; + } + Map<String, String> storageParams = new HashMap<>(normalizedParams); + // StorageProperties detects provider by "uri", we also hint support by scheme. + if (!Strings.isNullOrEmpty(scheme)) { + storageParams.put("uri", firstPath); + } + if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) { + storageParams.put(StorageProperties.FS_LOCAL_SUPPORT, "true"); + } else { + forceStorageSupport(storageParams, scheme.toLowerCase()); Review Comment: I don't think we need to force set `fs.xx.support`. `torageProperties.createPrimary` can find the real storage type ########## be/src/vec/exec/format/parquet/parquet_predicate.h: ########## @@ -177,13 +177,16 @@ class ParquetPredicate { // TINYINT/SMALLINT also need conversion via LittleIntPhysicalConverter. switch (type) { case TYPE_BOOLEAN: + case TYPE_TINYINT: Review Comment: why adding these new types? ########## cloud/src/main.cpp: ########## @@ -136,7 +136,7 @@ static void help() { static std::string build_info() { std::stringstream ss; -// clang-format off + // clang-format off #if defined(NDEBUG) Review Comment: do not change this ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,474 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function parquet_meta for reading Parquet metadata. + * Currently works in two modes: + * - parquet_meta (mode parquet_metadata): row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + * - parquet_file_metadata: file-level metadata aligned with DuckDB parquet_file_metadata() + * - parquet_kv_metadata: file key/value metadata aligned with DuckDB parquet_kv_metadata() + * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB parquet_bloom_probe() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + public static final String NAME_FILE_METADATA = "parquet_file_metadata"; + public static final String NAME_KV_METADATA = "parquet_kv_metadata"; + public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe"; + private static final String PATH = "path"; + private static final String MODE = "mode"; + private static final String COLUMN = "column"; + private static final String COLUMN_NAME = "column_name"; + private static final String VALUE = "value"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final String MODE_FILE_METADATA = "parquet_file_metadata"; + private static final String MODE_KV_METADATA = "parquet_kv_metadata"; + private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe"; + private static final ImmutableSet<String> SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, MODE_KV_METADATA, + MODE_BLOOM_PROBE); + + private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("repetition_level", PrimitiveType.INT, true), + new Column("definition_level", PrimitiveType.INT, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("precision", PrimitiveType.INT, true), + new Column("scale", PrimitiveType.INT, true), + new Column("is_nullable", PrimitiveType.BOOLEAN, true) + ); + + private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("column_id", PrimitiveType.INT, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("converted_type", PrimitiveType.STRING, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("null_count", PrimitiveType.BIGINT, true), + new Column("distinct_count", PrimitiveType.BIGINT, true), + new Column("encodings", PrimitiveType.STRING, true), + new Column("compression", PrimitiveType.STRING, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("statistics_min", ScalarType.createStringType(), true), + new Column("statistics_max", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("created_by", PrimitiveType.STRING, true), + new Column("num_rows", PrimitiveType.BIGINT, true), + new Column("num_row_groups", PrimitiveType.BIGINT, true), + new Column("format_version", PrimitiveType.BIGINT, true), + new Column("encryption_algorithm", PrimitiveType.STRING, true), + new Column("footer_signing_key_metadata", PrimitiveType.STRING, true) + ); + + private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("key", ScalarType.createStringType(), true), + new Column("value", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true) + ); + + private final List<String> paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map<String, String> properties; + private final String bloomColumn; + private final String bloomLiteral; + + public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawPath = normalizedParams.get(PATH); + if (Strings.isNullOrEmpty(rawPath)) { + throw new AnalysisException("Property 'path' is required for parquet_meta"); + } + String parsedPath = rawPath.trim(); + if (parsedPath.isEmpty()) { + throw new AnalysisException("Property 'path' must contain at least one location"); + } + List<String> parsedPaths = Collections.singletonList(parsedPath); + List<String> normalizedPaths = parsedPaths; + + String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA); + mode = rawMode.toLowerCase(); + if (!SUPPORTED_MODES.contains(mode)) { + throw new AnalysisException("Unsupported mode '" + rawMode + "' for parquet_meta"); + } + String tmpBloomColumn = null; + String tmpBloomLiteral = null; + if (MODE_BLOOM_PROBE.equals(mode)) { + tmpBloomColumn = normalizedParams.get(COLUMN); + if (Strings.isNullOrEmpty(tmpBloomColumn)) { + tmpBloomColumn = normalizedParams.get(COLUMN_NAME); + } + tmpBloomLiteral = normalizedParams.get(VALUE); + if (Strings.isNullOrEmpty(tmpBloomColumn) || Strings.isNullOrEmpty(tmpBloomLiteral)) { + throw new AnalysisException( + "Missing 'column' or 'value' for mode parquet_bloom_probe"); + } + tmpBloomColumn = tmpBloomColumn.trim(); + tmpBloomLiteral = tmpBloomLiteral.trim(); + if (tmpBloomColumn.isEmpty() || tmpBloomLiteral.isEmpty()) { + throw new AnalysisException( + "Missing 'column' or 'value' for mode parquet_bloom_probe"); + } + } + + String firstPath = parsedPaths.get(0); + String scheme = null; + try { + scheme = new URI(firstPath).getScheme(); + } catch (URISyntaxException e) { + scheme = null; + } + Map<String, String> storageParams = new HashMap<>(normalizedParams); + // StorageProperties detects provider by "uri", we also hint support by scheme. + if (!Strings.isNullOrEmpty(scheme)) { + storageParams.put("uri", firstPath); + } + if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) { + storageParams.put(StorageProperties.FS_LOCAL_SUPPORT, "true"); + } else { + forceStorageSupport(storageParams, scheme.toLowerCase()); + } + StorageProperties storageProperties; + try { + storageProperties = StorageProperties.createPrimary(storageParams); + } catch (RuntimeException e) { + throw new AnalysisException( + "Failed to parse storage properties for parquet_meta: " + e.getMessage(), e); + } + this.fileType = mapToFileType(storageProperties.getType()); + Map<String, String> backendProps = storageProperties.getBackendConfigProperties(); + try { + List<String> tmpPaths = new ArrayList<>(parsedPaths.size()); + for (String path : parsedPaths) { + tmpPaths.add(storageProperties.validateAndNormalizeUri(path)); + } + normalizedPaths = tmpPaths; + } catch (UserException e) { + throw new AnalysisException( + "Failed to normalize parquet_meta paths: " + e.getMessage(), e); + } + if (this.fileType == TFileType.FILE_HTTP && !backendProps.containsKey("uri")) { + backendProps = new HashMap<>(backendProps); + backendProps.put("uri", normalizedPaths.get(0)); + } + this.properties = backendProps; + + // Expand any glob patterns (e.g. *.parquet) to concrete file paths. + normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, storageParams, this.fileType); + + this.paths = ImmutableList.copyOf(normalizedPaths); + this.bloomColumn = tmpBloomColumn; + this.bloomLiteral = tmpBloomLiteral; + } + + /** + * Expand wildcard paths to matching files. + */ + private static List<String> expandGlobPaths(List<String> inputPaths, + StorageProperties storageProperties, + Map<String, String> storageParams, + TFileType fileType) throws AnalysisException { + if (inputPaths == null || inputPaths.isEmpty()) { + return Collections.emptyList(); + } + List<String> expanded = new ArrayList<>(); + for (String path : inputPaths) { + if (!containsWildcards(path)) { + expanded.add(path); + continue; + } + expanded.addAll(expandSingleGlob(path, storageProperties, storageParams, fileType)); + } + if (expanded.isEmpty()) { + throw new AnalysisException("No files matched parquet_meta path patterns: " + inputPaths); + } + return expanded; + } + + private static boolean containsWildcards(String path) { + if (Strings.isNullOrEmpty(path)) { + return false; + } + return path.contains("*") || path.contains("[") || path.contains("{"); + } + + private static List<String> expandSingleGlob(String pattern, + StorageProperties storageProperties, + Map<String, String> storageParams, + TFileType fileType) throws AnalysisException { + if (fileType == TFileType.FILE_LOCAL) { + return globLocal(pattern); Review Comment: Why not reuse `LocalTableValuedFunction.getFileListFromBackend()` to get file list? ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,474 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function parquet_meta for reading Parquet metadata. + * Currently works in two modes: + * - parquet_meta (mode parquet_metadata): row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + * - parquet_file_metadata: file-level metadata aligned with DuckDB parquet_file_metadata() + * - parquet_kv_metadata: file key/value metadata aligned with DuckDB parquet_kv_metadata() + * - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB parquet_bloom_probe() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + public static final String NAME_FILE_METADATA = "parquet_file_metadata"; + public static final String NAME_KV_METADATA = "parquet_kv_metadata"; + public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe"; + private static final String PATH = "path"; + private static final String MODE = "mode"; + private static final String COLUMN = "column"; + private static final String COLUMN_NAME = "column_name"; + private static final String VALUE = "value"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final String MODE_FILE_METADATA = "parquet_file_metadata"; + private static final String MODE_KV_METADATA = "parquet_kv_metadata"; + private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe"; + private static final ImmutableSet<String> SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, MODE_KV_METADATA, + MODE_BLOOM_PROBE); + + private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("repetition_level", PrimitiveType.INT, true), + new Column("definition_level", PrimitiveType.INT, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("precision", PrimitiveType.INT, true), + new Column("scale", PrimitiveType.INT, true), + new Column("is_nullable", PrimitiveType.BOOLEAN, true) + ); + + private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("column_id", PrimitiveType.INT, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("converted_type", PrimitiveType.STRING, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("null_count", PrimitiveType.BIGINT, true), + new Column("distinct_count", PrimitiveType.BIGINT, true), + new Column("encodings", PrimitiveType.STRING, true), + new Column("compression", PrimitiveType.STRING, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("statistics_min", ScalarType.createStringType(), true), + new Column("statistics_max", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("created_by", PrimitiveType.STRING, true), + new Column("num_rows", PrimitiveType.BIGINT, true), + new Column("num_row_groups", PrimitiveType.BIGINT, true), + new Column("format_version", PrimitiveType.BIGINT, true), + new Column("encryption_algorithm", PrimitiveType.STRING, true), + new Column("footer_signing_key_metadata", PrimitiveType.STRING, true) + ); + + private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("key", ScalarType.createStringType(), true), + new Column("value", ScalarType.createStringType(), true) + ); + + private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("bloom_filter_excludes", PrimitiveType.BOOLEAN, true) + ); + + private final List<String> paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map<String, String> properties; + private final String bloomColumn; + private final String bloomLiteral; + + public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawPath = normalizedParams.get(PATH); + if (Strings.isNullOrEmpty(rawPath)) { + throw new AnalysisException("Property 'path' is required for parquet_meta"); + } + String parsedPath = rawPath.trim(); + if (parsedPath.isEmpty()) { + throw new AnalysisException("Property 'path' must contain at least one location"); + } + List<String> parsedPaths = Collections.singletonList(parsedPath); + List<String> normalizedPaths = parsedPaths; + + String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA); + mode = rawMode.toLowerCase(); + if (!SUPPORTED_MODES.contains(mode)) { + throw new AnalysisException("Unsupported mode '" + rawMode + "' for parquet_meta"); + } + String tmpBloomColumn = null; + String tmpBloomLiteral = null; + if (MODE_BLOOM_PROBE.equals(mode)) { + tmpBloomColumn = normalizedParams.get(COLUMN); Review Comment: what is different between COLUMN and COLUMN_NAME? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
