Copilot commented on code in PR #58972: URL: https://github.com/apache/doris/pull/58972#discussion_r2618909758
########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function for reading Parquet metadata. + * Currently works in two modes: + * - parquet_metadata: row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + */ Review Comment: The class documentation describes modes as "parquet_metadata" and "parquet_schema" which matches the internal implementation, but the function name is "parquet_meta" (line 64). Consider updating the documentation to clarify that the function is called "parquet_meta" and accepts a "mode" parameter with values "parquet_metadata" or "parquet_schema". ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function for reading Parquet metadata. + * Currently works in two modes: + * - parquet_metadata: row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + private static final String PATH = "path"; + private static final String MODE = "mode"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final ImmutableSet<String> SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA); + + private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("repetition_level", PrimitiveType.INT, true), + new Column("definition_level", PrimitiveType.INT, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("precision", PrimitiveType.INT, true), + new Column("scale", PrimitiveType.INT, true), + new Column("is_nullable", PrimitiveType.BOOLEAN, true) + ); + + private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("column_id", PrimitiveType.INT, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("converted_type", PrimitiveType.STRING, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("null_count", PrimitiveType.BIGINT, true), + new Column("distinct_count", PrimitiveType.BIGINT, true), + new Column("encodings", PrimitiveType.STRING, true), + new Column("compression", PrimitiveType.STRING, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("statistics_min", ScalarType.createStringType(), true), + new Column("statistics_max", ScalarType.createStringType(), true) + ); + + private final List<String> paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map<String, String> properties; + + public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawPath = normalizedParams.get(PATH); + if (Strings.isNullOrEmpty(rawPath)) { + throw new AnalysisException("Property 'path' is required for parquet_metadata"); + } + List<String> parsedPaths = Arrays.stream(rawPath.split(",")) + .map(String::trim) + .filter(token -> !token.isEmpty()) + .collect(Collectors.toList()); + if (parsedPaths.isEmpty()) { + throw new AnalysisException("Property 'path' must contain at least one location"); + } + List<String> normalizedPaths = parsedPaths; + + String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA); + mode = rawMode.toLowerCase(); + if (!SUPPORTED_MODES.contains(mode)) { + throw new AnalysisException("Unsupported mode '" + rawMode + "' for parquet_metadata"); + } + + String firstPath = parsedPaths.get(0); + String scheme = null; + try { + scheme = new URI(firstPath).getScheme(); + } catch (URISyntaxException e) { + scheme = null; + } + StorageProperties storageProperties = null; + if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) { + this.fileType = TFileType.FILE_LOCAL; + this.properties = Collections.emptyMap(); + } else { + Map<String, String> storageParams = new HashMap<>(normalizedParams); + // StorageProperties detects provider by "uri", we also hint support by scheme. + storageParams.put("uri", firstPath); + forceStorageSupport(storageParams, scheme.toLowerCase()); + try { + storageProperties = StorageProperties.createPrimary(storageParams); + } catch (RuntimeException e) { + throw new AnalysisException( + "Failed to parse storage properties for parquet_metadata: " + e.getMessage(), e); + } + this.fileType = mapToFileType(storageProperties.getType()); + Map<String, String> backendProps = storageProperties.getBackendConfigProperties(); + try { + List<String> tmpPaths = new ArrayList<>(parsedPaths.size()); + for (String path : parsedPaths) { + tmpPaths.add(storageProperties.validateAndNormalizeUri(path)); + } + normalizedPaths = tmpPaths; + } catch (UserException e) { + throw new AnalysisException( + "Failed to normalize parquet_metadata paths: " + e.getMessage(), e); + } + if (this.fileType == TFileType.FILE_HTTP && !backendProps.containsKey("uri")) { + backendProps = new HashMap<>(backendProps); + backendProps.put("uri", normalizedPaths.get(0)); + } + this.properties = backendProps; + } + + // Expand any glob patterns (e.g. *.parquet) to concrete file paths. + normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, this.fileType); + + this.paths = ImmutableList.copyOf(normalizedPaths); + } Review Comment: The new ParquetMetadataTableValuedFunction lacks test coverage. Given that other table-valued functions in the repository have test coverage (e.g., ExternalFileTableValuedFunctionTest.java), unit tests should be added to verify the function's behavior, including parameter validation, path expansion, mode handling, and error cases. ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function for reading Parquet metadata. + * Currently works in two modes: + * - parquet_metadata: row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + private static final String PATH = "path"; + private static final String MODE = "mode"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final ImmutableSet<String> SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA); + + private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("repetition_level", PrimitiveType.INT, true), + new Column("definition_level", PrimitiveType.INT, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("precision", PrimitiveType.INT, true), + new Column("scale", PrimitiveType.INT, true), + new Column("is_nullable", PrimitiveType.BOOLEAN, true) + ); + + private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("column_id", PrimitiveType.INT, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("converted_type", PrimitiveType.STRING, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("null_count", PrimitiveType.BIGINT, true), + new Column("distinct_count", PrimitiveType.BIGINT, true), + new Column("encodings", PrimitiveType.STRING, true), + new Column("compression", PrimitiveType.STRING, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("statistics_min", ScalarType.createStringType(), true), + new Column("statistics_max", ScalarType.createStringType(), true) + ); + + private final List<String> paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map<String, String> properties; + + public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawPath = normalizedParams.get(PATH); + if (Strings.isNullOrEmpty(rawPath)) { + throw new AnalysisException("Property 'path' is required for parquet_metadata"); + } + List<String> parsedPaths = Arrays.stream(rawPath.split(",")) + .map(String::trim) + .filter(token -> !token.isEmpty()) + .collect(Collectors.toList()); + if (parsedPaths.isEmpty()) { + throw new AnalysisException("Property 'path' must contain at least one location"); + } + List<String> normalizedPaths = parsedPaths; + + String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA); + mode = rawMode.toLowerCase(); + if (!SUPPORTED_MODES.contains(mode)) { + throw new AnalysisException("Unsupported mode '" + rawMode + "' for parquet_metadata"); + } + + String firstPath = parsedPaths.get(0); + String scheme = null; + try { + scheme = new URI(firstPath).getScheme(); + } catch (URISyntaxException e) { + scheme = null; + } + StorageProperties storageProperties = null; + if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) { + this.fileType = TFileType.FILE_LOCAL; + this.properties = Collections.emptyMap(); + } else { + Map<String, String> storageParams = new HashMap<>(normalizedParams); + // StorageProperties detects provider by "uri", we also hint support by scheme. + storageParams.put("uri", firstPath); + forceStorageSupport(storageParams, scheme.toLowerCase()); + try { + storageProperties = StorageProperties.createPrimary(storageParams); + } catch (RuntimeException e) { + throw new AnalysisException( + "Failed to parse storage properties for parquet_metadata: " + e.getMessage(), e); + } + this.fileType = mapToFileType(storageProperties.getType()); + Map<String, String> backendProps = storageProperties.getBackendConfigProperties(); + try { + List<String> tmpPaths = new ArrayList<>(parsedPaths.size()); + for (String path : parsedPaths) { + tmpPaths.add(storageProperties.validateAndNormalizeUri(path)); + } + normalizedPaths = tmpPaths; + } catch (UserException e) { + throw new AnalysisException( + "Failed to normalize parquet_metadata paths: " + e.getMessage(), e); + } + if (this.fileType == TFileType.FILE_HTTP && !backendProps.containsKey("uri")) { + backendProps = new HashMap<>(backendProps); + backendProps.put("uri", normalizedPaths.get(0)); + } + this.properties = backendProps; + } + + // Expand any glob patterns (e.g. *.parquet) to concrete file paths. + normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, this.fileType); + + this.paths = ImmutableList.copyOf(normalizedPaths); + } + + /** + * Expand wildcard paths to matching files. + */ + private static List<String> expandGlobPaths(List<String> inputPaths, + StorageProperties storageProperties, + TFileType fileType) throws AnalysisException { + if (inputPaths == null || inputPaths.isEmpty()) { + return Collections.emptyList(); + } + List<String> expanded = new ArrayList<>(); + for (String path : inputPaths) { + if (!containsWildcards(path)) { + expanded.add(path); + continue; + } + expanded.addAll(expandSingleGlob(path, storageProperties, fileType)); + } + if (expanded.isEmpty()) { + throw new AnalysisException("No files matched parquet_metadata path patterns: " + inputPaths); + } + return expanded; + } + + private static boolean containsWildcards(String path) { + if (Strings.isNullOrEmpty(path)) { + return false; + } + return path.contains("*") || path.contains("?") || path.contains("[") || path.contains("{"); + } + + private static List<String> expandSingleGlob(String pattern, + StorageProperties storageProperties, + TFileType fileType) throws AnalysisException { + if (fileType == TFileType.FILE_LOCAL) { + return globLocal(pattern); + } + if (storageProperties == null) { + throw new AnalysisException("Storage properties is required for glob pattern: " + pattern); + } + if (fileType == TFileType.FILE_S3 || fileType == TFileType.FILE_HDFS) { + return globRemote(storageProperties, pattern); + } + throw new AnalysisException("Glob patterns are not supported for file type: " + fileType); + } + + private static List<String> globRemote(StorageProperties storageProperties, + String pattern) throws AnalysisException { + List<RemoteFile> remoteFiles = new ArrayList<>(); + try (RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties)) { + Status status = fileSystem.globList(pattern, remoteFiles, false); + if (!status.ok()) { + throw new AnalysisException(status.getErrMsg()); + } + } catch (Exception e) { + throw new AnalysisException("Failed to expand glob pattern '" + pattern + "': " + + e.getMessage(), e); + } + return remoteFiles.stream() + .filter(RemoteFile::isFile) + .map(RemoteFile::getName) + .collect(Collectors.toList()); + } + + /** + * Local glob expansion is executed on an arbitrary alive backend, because FE may not + * have access to BE local file system. + */ + private static List<String> globLocal(String pattern) throws AnalysisException { + Backend backend; + try { + backend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().stream() + .filter(Backend::isAlive) + .findFirst() + .orElse(null); + } catch (AnalysisException e) { + throw e; + } Review Comment: The catch block unnecessarily catches and re-throws the same AnalysisException without adding any value. The try-catch block should be removed since getBackendsByCurrentCluster() already throws AnalysisException which will propagate naturally. ```suggestion backend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().stream() .filter(Backend::isAlive) .findFirst() .orElse(null); ``` ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/ParquetMetadataTableValuedFunction.java: ########## @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.property.storage.StorageProperties; +import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.proto.InternalService; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TParquetMetadataParams; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Table-valued function for reading Parquet metadata. + * Currently works in two modes: + * - parquet_metadata: row-group/column statistics similar to DuckDB parquet_metadata() + * - parquet_schema: logical schema similar to DuckDB parquet_schema() + */ +public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "parquet_meta"; + private static final String PATH = "path"; + private static final String MODE = "mode"; + + private static final String MODE_METADATA = "parquet_metadata"; + private static final String MODE_SCHEMA = "parquet_schema"; + private static final ImmutableSet<String> SUPPORTED_MODES = + ImmutableSet.of(MODE_METADATA, MODE_SCHEMA); + + private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("repetition_level", PrimitiveType.INT, true), + new Column("definition_level", PrimitiveType.INT, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("precision", PrimitiveType.INT, true), + new Column("scale", PrimitiveType.INT, true), + new Column("is_nullable", PrimitiveType.BOOLEAN, true) + ); + + private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of( + new Column("file_name", PrimitiveType.STRING, true), + new Column("row_group_id", PrimitiveType.INT, true), + new Column("column_id", PrimitiveType.INT, true), + new Column("column_name", PrimitiveType.STRING, true), + new Column("column_path", PrimitiveType.STRING, true), + new Column("physical_type", PrimitiveType.STRING, true), + new Column("logical_type", PrimitiveType.STRING, true), + new Column("type_length", PrimitiveType.INT, true), + new Column("converted_type", PrimitiveType.STRING, true), + new Column("num_values", PrimitiveType.BIGINT, true), + new Column("null_count", PrimitiveType.BIGINT, true), + new Column("distinct_count", PrimitiveType.BIGINT, true), + new Column("encodings", PrimitiveType.STRING, true), + new Column("compression", PrimitiveType.STRING, true), + new Column("data_page_offset", PrimitiveType.BIGINT, true), + new Column("index_page_offset", PrimitiveType.BIGINT, true), + new Column("dictionary_page_offset", PrimitiveType.BIGINT, true), + new Column("total_compressed_size", PrimitiveType.BIGINT, true), + new Column("total_uncompressed_size", PrimitiveType.BIGINT, true), + new Column("statistics_min", ScalarType.createStringType(), true), + new Column("statistics_max", ScalarType.createStringType(), true) + ); + + private final List<String> paths; + private final String mode; + // File system info for remote Parquet access (e.g. S3). + private final TFileType fileType; + private final Map<String, String> properties; + + public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + Map.Entry::getValue, + (value1, value2) -> value2 + )); + String rawPath = normalizedParams.get(PATH); + if (Strings.isNullOrEmpty(rawPath)) { + throw new AnalysisException("Property 'path' is required for parquet_metadata"); + } + List<String> parsedPaths = Arrays.stream(rawPath.split(",")) + .map(String::trim) + .filter(token -> !token.isEmpty()) + .collect(Collectors.toList()); + if (parsedPaths.isEmpty()) { + throw new AnalysisException("Property 'path' must contain at least one location"); + } + List<String> normalizedPaths = parsedPaths; + + String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA); + mode = rawMode.toLowerCase(); + if (!SUPPORTED_MODES.contains(mode)) { + throw new AnalysisException("Unsupported mode '" + rawMode + "' for parquet_metadata"); + } + + String firstPath = parsedPaths.get(0); + String scheme = null; + try { + scheme = new URI(firstPath).getScheme(); + } catch (URISyntaxException e) { + scheme = null; + } + StorageProperties storageProperties = null; + if (Strings.isNullOrEmpty(scheme) || "file".equalsIgnoreCase(scheme)) { + this.fileType = TFileType.FILE_LOCAL; + this.properties = Collections.emptyMap(); + } else { + Map<String, String> storageParams = new HashMap<>(normalizedParams); + // StorageProperties detects provider by "uri", we also hint support by scheme. + storageParams.put("uri", firstPath); + forceStorageSupport(storageParams, scheme.toLowerCase()); + try { + storageProperties = StorageProperties.createPrimary(storageParams); + } catch (RuntimeException e) { + throw new AnalysisException( + "Failed to parse storage properties for parquet_metadata: " + e.getMessage(), e); + } + this.fileType = mapToFileType(storageProperties.getType()); + Map<String, String> backendProps = storageProperties.getBackendConfigProperties(); + try { + List<String> tmpPaths = new ArrayList<>(parsedPaths.size()); + for (String path : parsedPaths) { + tmpPaths.add(storageProperties.validateAndNormalizeUri(path)); + } + normalizedPaths = tmpPaths; + } catch (UserException e) { + throw new AnalysisException( + "Failed to normalize parquet_metadata paths: " + e.getMessage(), e); + } + if (this.fileType == TFileType.FILE_HTTP && !backendProps.containsKey("uri")) { + backendProps = new HashMap<>(backendProps); + backendProps.put("uri", normalizedPaths.get(0)); + } + this.properties = backendProps; + } + + // Expand any glob patterns (e.g. *.parquet) to concrete file paths. + normalizedPaths = expandGlobPaths(normalizedPaths, storageProperties, this.fileType); + + this.paths = ImmutableList.copyOf(normalizedPaths); + } + + /** + * Expand wildcard paths to matching files. + */ + private static List<String> expandGlobPaths(List<String> inputPaths, + StorageProperties storageProperties, + TFileType fileType) throws AnalysisException { + if (inputPaths == null || inputPaths.isEmpty()) { + return Collections.emptyList(); + } + List<String> expanded = new ArrayList<>(); + for (String path : inputPaths) { + if (!containsWildcards(path)) { + expanded.add(path); + continue; + } + expanded.addAll(expandSingleGlob(path, storageProperties, fileType)); + } + if (expanded.isEmpty()) { + throw new AnalysisException("No files matched parquet_metadata path patterns: " + inputPaths); + } + return expanded; + } + + private static boolean containsWildcards(String path) { + if (Strings.isNullOrEmpty(path)) { + return false; + } + return path.contains("*") || path.contains("?") || path.contains("[") || path.contains("{"); + } + + private static List<String> expandSingleGlob(String pattern, + StorageProperties storageProperties, + TFileType fileType) throws AnalysisException { + if (fileType == TFileType.FILE_LOCAL) { + return globLocal(pattern); + } + if (storageProperties == null) { + throw new AnalysisException("Storage properties is required for glob pattern: " + pattern); + } + if (fileType == TFileType.FILE_S3 || fileType == TFileType.FILE_HDFS) { + return globRemote(storageProperties, pattern); + } + throw new AnalysisException("Glob patterns are not supported for file type: " + fileType); + } + + private static List<String> globRemote(StorageProperties storageProperties, + String pattern) throws AnalysisException { + List<RemoteFile> remoteFiles = new ArrayList<>(); + try (RemoteFileSystem fileSystem = FileSystemFactory.get(storageProperties)) { + Status status = fileSystem.globList(pattern, remoteFiles, false); + if (!status.ok()) { + throw new AnalysisException(status.getErrMsg()); + } + } catch (Exception e) { + throw new AnalysisException("Failed to expand glob pattern '" + pattern + "': " + + e.getMessage(), e); + } + return remoteFiles.stream() + .filter(RemoteFile::isFile) + .map(RemoteFile::getName) + .collect(Collectors.toList()); + } + + /** + * Local glob expansion is executed on an arbitrary alive backend, because FE may not + * have access to BE local file system. + */ + private static List<String> globLocal(String pattern) throws AnalysisException { + Backend backend; + try { + backend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values().stream() + .filter(Backend::isAlive) + .findFirst() + .orElse(null); + } catch (AnalysisException e) { + throw e; + } + if (backend == null) { + throw new AnalysisException("No alive backends to expand local glob pattern"); + } + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); + InternalService.PGlobRequest.Builder requestBuilder = InternalService.PGlobRequest.newBuilder(); + requestBuilder.setPattern(pattern); + long timeoutS = ConnectContext.get() == null ? 60 + : Math.min(ConnectContext.get().getQueryTimeoutS(), 60); + try { + Future<InternalService.PGlobResponse> response = + proxy.glob(backend.getBrpcAddress(), requestBuilder.build()); + InternalService.PGlobResponse globResponse = + response.get(timeoutS, TimeUnit.SECONDS); + if (globResponse.getStatus().getStatusCode() != 0) { + throw new AnalysisException("Expand local glob pattern failed: " + + globResponse.getStatus().getErrorMsgsList()); + } + List<String> result = new ArrayList<>(); + for (InternalService.PGlobResponse.PFileInfo fileInfo : globResponse.getFilesList()) { + result.add(fileInfo.getFile().trim()); + } + return result; + } catch (Exception e) { + throw new AnalysisException("Failed to expand local glob pattern '" + pattern + "': " + + e.getMessage(), e); + } + } + + /** + * Add fs.* support hints so StorageProperties can be created from URI-only inputs. + */ + private static void forceStorageSupport(Map<String, String> params, String scheme) { + if ("s3".equals(scheme) || "s3a".equals(scheme) || "s3n".equals(scheme)) { + params.put(StorageProperties.FS_S3_SUPPORT, "true"); + } else if ("oss".equals(scheme)) { + params.put(StorageProperties.FS_OSS_SUPPORT, "true"); + } else if ("obs".equals(scheme)) { + params.put(StorageProperties.FS_OBS_SUPPORT, "true"); + } else if ("cos".equals(scheme)) { + params.put(StorageProperties.FS_COS_SUPPORT, "true"); + } else if ("gs".equals(scheme) || "gcs".equals(scheme)) { + params.put(StorageProperties.FS_GCS_SUPPORT, "true"); + } else if ("minio".equals(scheme)) { + params.put(StorageProperties.FS_MINIO_SUPPORT, "true"); + } else if ("azure".equals(scheme)) { + params.put(StorageProperties.FS_AZURE_SUPPORT, "true"); + } else if ("http".equals(scheme) || "https".equals(scheme) || "hf".equals(scheme)) { + params.put(StorageProperties.FS_HTTP_SUPPORT, "true"); + } + } + + /** + * Map FE storage type to BE file type. + */ + private static TFileType mapToFileType(StorageProperties.Type type) throws AnalysisException { + switch (type) { + case HDFS: + case OSS_HDFS: + return TFileType.FILE_HDFS; + case HTTP: + return TFileType.FILE_HTTP; + case LOCAL: + return TFileType.FILE_LOCAL; + case S3: + case OSS: + case OBS: + case COS: + case GCS: + case MINIO: + case AZURE: + return TFileType.FILE_S3; + default: + throw new AnalysisException("Unsupported storage type for parquet_metadata: " + type); Review Comment: Error messages reference "parquet_metadata" instead of the actual function name "parquet_meta". This inconsistency appears in multiple error messages (lines 125, 132, 139, 162, 174, 207, 226, 338). All error messages should use the actual function name "parquet_meta" for clarity. ```suggestion throw new AnalysisException("Unsupported storage type for parquet_meta: " + type); ``` ########## sbe: ########## @@ -0,0 +1,7 @@ +#!/bin/bash +set -euo pipefail + +unset http_proxy +unset https_proxy +cd /mnt/disk1/chenjunwei/doris_build/doris/output/be +./bin/start_be.sh --daemon Review Comment: This file appears to be a personal development script with hardcoded paths (e.g., `/mnt/disk1/chenjunwei/doris_build/doris/output/be`) and should not be included in the PR. It should be removed from the repository. ```suggestion ``` ########## be/src/vec/exec/format/table/parquet_metadata_reader.h: ########## @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> + +#include <map> +#include <memory> +#include <string> +#include <vector> + +#include "common/factory_creator.h" +#include "common/status.h" +#include "runtime/descriptors.h" +#include "vec/exec/format/generic_reader.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; +namespace io { +class FileReader; +} // namespace io +} // namespace doris + +namespace doris::vectorized { +class Block; + +// Lightweight reader that surfaces Parquet footer metadata as a table-valued scan. +// It reads only file footers (no data pages) and emits either schema rows or +// row-group/column statistics based on `mode`. +class ParquetMetadataReader : public GenericReader { + ENABLE_FACTORY_CREATOR(ParquetMetadataReader); + +public: + class ModeHandler; + + ParquetMetadataReader(std::vector<SlotDescriptor*> slots, RuntimeState* state, + RuntimeProfile* profile, TMetaScanRange scan_range); + ~ParquetMetadataReader() override; + + Status init_reader(); + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status close() override; + +private: + + Review Comment: There are extra blank lines between the private access specifier and the first private method declaration. Remove one of the blank lines for consistency with the codebase style. ```suggestion ``` ########## be/src/vec/exec/format/table/parquet_metadata_reader.cpp: ########## @@ -0,0 +1,908 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/table/parquet_metadata_reader.h" + +#include <fmt/format.h> + +#include <array> +#include <algorithm> +#include <cctype> +#include <memory> +#include <unordered_map> +#include <utility> + +#include "common/logging.h" +#include "io/file_factory.h" +#include "io/fs/file_reader.h" +#include "io/fs/local_file_system.h" +#include "io/io_common.h" +#include "runtime/runtime_state.h" +#include "util/string_util.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/common/assert_cast.h" +#include "vec/common/unaligned.h" +#include "vec/core/block.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exec/format/parquet/parquet_column_convert.h" +#include "vec/exec/format/parquet/parquet_thrift_util.h" +#include "vec/exec/format/parquet/schema_desc.h" +#include "vec/exec/format/parquet/vparquet_file_metadata.h" + +namespace doris::vectorized { +namespace { + +constexpr const char* MODE_SCHEMA = "parquet_schema"; +constexpr const char* MODE_METADATA = "parquet_metadata"; + +enum SchemaColumnIndex : size_t { + SCHEMA_FILE_NAME = 0, + SCHEMA_COLUMN_NAME, + SCHEMA_COLUMN_PATH, + SCHEMA_PHYSICAL_TYPE, + SCHEMA_LOGICAL_TYPE, + SCHEMA_REPETITION_LEVEL, + SCHEMA_DEFINITION_LEVEL, + SCHEMA_TYPE_LENGTH, + SCHEMA_PRECISION, + SCHEMA_SCALE, + SCHEMA_IS_NULLABLE, + SCHEMA_COLUMN_COUNT +}; + +enum MetadataColumnIndex : size_t { + META_FILE_NAME = 0, + META_ROW_GROUP_ID, + META_COLUMN_ID, + META_COLUMN_NAME, + META_COLUMN_PATH, + META_PHYSICAL_TYPE, + META_LOGICAL_TYPE, + META_TYPE_LENGTH, + META_CONVERTED_TYPE, + META_NUM_VALUES, + META_NULL_COUNT, + META_DISTINCT_COUNT, + META_ENCODINGS, + META_COMPRESSION, + META_DATA_PAGE_OFFSET, + META_INDEX_PAGE_OFFSET, + META_DICTIONARY_PAGE_OFFSET, + META_TOTAL_COMPRESSED_SIZE, + META_TOTAL_UNCOMPRESSED_SIZE, + META_STATISTICS_MIN, + META_STATISTICS_MAX, + META_COLUMN_COUNT +}; + +constexpr std::array<const char*, SCHEMA_COLUMN_COUNT> kSchemaColumnNames = { + "file_name", "column_name", "column_path", "physical_type", + "logical_type", "repetition_level", "definition_level", "type_length", + "precision", "scale", "is_nullable"}; + +static_assert(kSchemaColumnNames.size() == SCHEMA_COLUMN_COUNT); + +constexpr std::array<const char*, META_COLUMN_COUNT> kMetadataColumnNames = { + "file_name", "row_group_id", "column_id", "column_name", "column_path", "physical_type", + "logical_type", "type_length", "converted_type", "num_values", "null_count", + "distinct_count", "encodings", "compression", "data_page_offset", "index_page_offset", + "dictionary_page_offset", "total_compressed_size", "total_uncompressed_size", + "statistics_min", "statistics_max"}; + +static_assert(kMetadataColumnNames.size() == META_COLUMN_COUNT); + +std::string join_path(const std::vector<std::string>& items) { + return join(items, "."); +} + +template <typename ColumnType, typename T> +void insert_numeric_impl(MutableColumnPtr& column, T value) { + if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) { + nullable->get_null_map_data().push_back(0); + auto& nested = nullable->get_nested_column(); + assert_cast<ColumnType&>(nested).insert_value(value); + } else { + assert_cast<ColumnType&>(*column).insert_value(value); + } +} + +void insert_int32(MutableColumnPtr& column, Int32 value) { + insert_numeric_impl<ColumnInt32>(column, value); +} + +void insert_int64(MutableColumnPtr& column, Int64 value) { + insert_numeric_impl<ColumnInt64>(column, value); +} + +void insert_bool(MutableColumnPtr& column, bool value) { + insert_numeric_impl<ColumnUInt8>(column, static_cast<UInt8>(value)); +} + +void insert_string(MutableColumnPtr& column, const std::string& value) { + if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) { + nullable->get_null_map_data().push_back(0); + auto& nested = nullable->get_nested_column(); + assert_cast<ColumnString&>(nested).insert_data(value.c_str(), value.size()); + } else { + assert_cast<ColumnString&>(*column).insert_data(value.c_str(), value.size()); + } +} + +void insert_null(MutableColumnPtr& column) { + if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) { + nullable->get_null_map_data().push_back(1); + nullable->get_nested_column().insert_default(); + } else { + column->insert_default(); + } +} + +std::string physical_type_to_string(tparquet::Type::type type) { + switch (type) { + case tparquet::Type::BOOLEAN: + return "BOOLEAN"; + case tparquet::Type::INT32: + return "INT32"; + case tparquet::Type::INT64: + return "INT64"; + case tparquet::Type::INT96: + return "INT96"; + case tparquet::Type::FLOAT: + return "FLOAT"; + case tparquet::Type::DOUBLE: + return "DOUBLE"; + case tparquet::Type::BYTE_ARRAY: + return "BYTE_ARRAY"; + case tparquet::Type::FIXED_LEN_BYTE_ARRAY: + return "FIXED_LEN_BYTE_ARRAY"; + default: + return "UNKNOWN"; + } +} + +std::string compression_to_string(tparquet::CompressionCodec::type codec) { + switch (codec) { + case tparquet::CompressionCodec::UNCOMPRESSED: + return "UNCOMPRESSED"; + case tparquet::CompressionCodec::SNAPPY: + return "SNAPPY"; + case tparquet::CompressionCodec::GZIP: + return "GZIP"; + case tparquet::CompressionCodec::LZO: + return "LZO"; + case tparquet::CompressionCodec::BROTLI: + return "BROTLI"; + case tparquet::CompressionCodec::LZ4: + return "LZ4"; + case tparquet::CompressionCodec::ZSTD: + return "ZSTD"; + case tparquet::CompressionCodec::LZ4_RAW: + return "LZ4_RAW"; + default: + return "UNKNOWN"; + } +} + +std::string converted_type_to_string(tparquet::ConvertedType::type type) { + switch (type) { + case tparquet::ConvertedType::UTF8: + return "UTF8"; + case tparquet::ConvertedType::MAP: + return "MAP"; + case tparquet::ConvertedType::MAP_KEY_VALUE: + return "MAP_KEY_VALUE"; + case tparquet::ConvertedType::LIST: + return "LIST"; + case tparquet::ConvertedType::ENUM: + return "ENUM"; + case tparquet::ConvertedType::DECIMAL: + return "DECIMAL"; + case tparquet::ConvertedType::DATE: + return "DATE"; + case tparquet::ConvertedType::TIME_MILLIS: + return "TIME_MILLIS"; + case tparquet::ConvertedType::TIME_MICROS: + return "TIME_MICROS"; + case tparquet::ConvertedType::TIMESTAMP_MILLIS: + return "TIMESTAMP_MILLIS"; + case tparquet::ConvertedType::TIMESTAMP_MICROS: + return "TIMESTAMP_MICROS"; + case tparquet::ConvertedType::UINT_8: + return "UINT_8"; + case tparquet::ConvertedType::UINT_16: + return "UINT_16"; + case tparquet::ConvertedType::UINT_32: + return "UINT_32"; + case tparquet::ConvertedType::UINT_64: + return "UINT_64"; + case tparquet::ConvertedType::INT_8: + return "INT_8"; + case tparquet::ConvertedType::INT_16: + return "INT_16"; + case tparquet::ConvertedType::INT_32: + return "INT_32"; + case tparquet::ConvertedType::INT_64: + return "INT_64"; + case tparquet::ConvertedType::JSON: + return "JSON"; + case tparquet::ConvertedType::BSON: + return "BSON"; + case tparquet::ConvertedType::INTERVAL: + return "INTERVAL"; + default: + return "UNKNOWN"; + } +} + +std::string logical_type_to_string(const tparquet::SchemaElement& element) { + if (element.__isset.logicalType) { + const auto& logical = element.logicalType; + if (logical.__isset.STRING) { + return "STRING"; + } else if (logical.__isset.MAP) { + return "MAP"; + } else if (logical.__isset.LIST) { + return "LIST"; + } else if (logical.__isset.ENUM) { + return "ENUM"; + } else if (logical.__isset.DECIMAL) { + return "DECIMAL"; + } else if (logical.__isset.DATE) { + return "DATE"; + } else if (logical.__isset.TIME) { + return "TIME"; + } else if (logical.__isset.TIMESTAMP) { + return "TIMESTAMP"; + } else if (logical.__isset.INTEGER) { + return "INTEGER"; + } else if (logical.__isset.UNKNOWN) { + return "UNKNOWN"; + } else if (logical.__isset.JSON) { + return "JSON"; + } else if (logical.__isset.BSON) { + return "BSON"; + } else if (logical.__isset.UUID) { + return "UUID"; + } else if (logical.__isset.FLOAT16) { + return "FLOAT16"; + } else if (logical.__isset.VARIANT) { + return "VARIANT"; + } else if (logical.__isset.GEOMETRY) { + return "GEOMETRY"; + } else if (logical.__isset.GEOGRAPHY) { + return "GEOGRAPHY"; + } + } + if (element.__isset.converted_type) { + return converted_type_to_string(element.converted_type); + } + return ""; +} + +std::string encodings_to_string(const std::vector<tparquet::Encoding::type>& encodings) { + std::vector<std::string> parts; + parts.reserve(encodings.size()); + for (auto encoding : encodings) { + switch (encoding) { + case tparquet::Encoding::PLAIN: + parts.emplace_back("PLAIN"); + break; + case tparquet::Encoding::PLAIN_DICTIONARY: + parts.emplace_back("PLAIN_DICTIONARY"); + break; + case tparquet::Encoding::RLE: + parts.emplace_back("RLE"); + break; + case tparquet::Encoding::BIT_PACKED: + parts.emplace_back("BIT_PACKED"); + break; + case tparquet::Encoding::DELTA_BINARY_PACKED: + parts.emplace_back("DELTA_BINARY_PACKED"); + break; + case tparquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: + parts.emplace_back("DELTA_LENGTH_BYTE_ARRAY"); + break; + case tparquet::Encoding::DELTA_BYTE_ARRAY: + parts.emplace_back("DELTA_BYTE_ARRAY"); + break; + case tparquet::Encoding::RLE_DICTIONARY: + parts.emplace_back("RLE_DICTIONARY"); + break; + default: + parts.emplace_back("UNKNOWN"); + break; + } + } + return fmt::format("{}", fmt::join(parts, ",")); +} + +bool try_get_statistics_encoded_value(const tparquet::Statistics& statistics, bool is_min, + std::string* encoded_value) { + if (is_min) { + if (statistics.__isset.min_value) { + *encoded_value = statistics.min_value; + return true; + } + if (statistics.__isset.min) { + *encoded_value = statistics.min; + return true; + } + } else { + if (statistics.__isset.max_value) { + *encoded_value = statistics.max_value; + return true; + } + if (statistics.__isset.max) { + *encoded_value = statistics.max; + return true; + } + } + encoded_value->clear(); + return false; +} + +std::string bytes_to_hex_string(const std::string& bytes) { + static constexpr char kHexDigits[] = "0123456789ABCDEF"; + std::string hex; + hex.resize(bytes.size() * 2); + for (size_t i = 0; i < bytes.size(); ++i) { + auto byte = static_cast<uint8_t>(bytes[i]); + hex[i * 2] = kHexDigits[byte >> 4]; + hex[i * 2 + 1] = kHexDigits[byte & 0x0F]; + } + return fmt::format("0x{}", hex); +} + +std::string decode_statistics_value(const FieldSchema* schema_field, tparquet::Type::type physical_type, + const std::string& encoded_value, + const cctz::time_zone& ctz) { + if (encoded_value.empty()) { + return ""; + } + if (schema_field == nullptr) { + return bytes_to_hex_string(encoded_value); + } + + auto logical_data_type = remove_nullable(schema_field->data_type); + auto converter = parquet::PhysicalToLogicalConverter::get_converter( + schema_field, logical_data_type, logical_data_type, &ctz); + if (!converter || !converter->support()) { + return bytes_to_hex_string(encoded_value); + } + + ColumnPtr physical_column; + switch (physical_type) { + case tparquet::Type::type::BOOLEAN: { + if (encoded_value.size() != sizeof(UInt8)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnUInt8::create(); + physical_col->insert_value(doris::unaligned_load<UInt8>(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::INT32: { + if (encoded_value.size() != sizeof(Int32)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnInt32::create(); + physical_col->insert_value(doris::unaligned_load<Int32>(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::INT64: { + if (encoded_value.size() != sizeof(Int64)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnInt64::create(); + physical_col->insert_value(doris::unaligned_load<Int64>(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::FLOAT: { + if (encoded_value.size() != sizeof(Float32)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnFloat32::create(); + physical_col->insert_value(doris::unaligned_load<Float32>(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::DOUBLE: { + if (encoded_value.size() != sizeof(Float64)) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnFloat64::create(); + physical_col->insert_value(doris::unaligned_load<Float64>(encoded_value.data())); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::BYTE_ARRAY: { + auto physical_col = ColumnString::create(); + physical_col->insert_data(encoded_value.data(), encoded_value.size()); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY: { + int32_t type_length = schema_field->parquet_schema.__isset.type_length + ? schema_field->parquet_schema.type_length + : 0; + if (type_length <= 0) { + type_length = static_cast<int32_t>(encoded_value.size()); + } + if (static_cast<size_t>(type_length) != encoded_value.size()) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnUInt8::create(); + physical_col->resize(type_length); + memcpy(physical_col->get_data().data(), encoded_value.data(), encoded_value.size()); + physical_column = std::move(physical_col); + break; + } + case tparquet::Type::type::INT96: { + constexpr size_t kInt96Size = 12; + if (encoded_value.size() != kInt96Size) { + return bytes_to_hex_string(encoded_value); + } + auto physical_col = ColumnInt8::create(); + physical_col->resize(kInt96Size); + memcpy(physical_col->get_data().data(), encoded_value.data(), encoded_value.size()); + physical_column = std::move(physical_col); + break; + } + default: + return bytes_to_hex_string(encoded_value); + } + + ColumnPtr logical_column; + if (converter->is_consistent()) { + logical_column = physical_column; + } else { + logical_column = logical_data_type->create_column(); + if (Status st = converter->physical_convert(physical_column, logical_column); !st.ok()) { + return bytes_to_hex_string(encoded_value); + } + } + + if (logical_column->size() != 1) { + return bytes_to_hex_string(encoded_value); + } + return logical_data_type->to_string(*logical_column, 0); +} + +void build_path_map(const FieldSchema& field, const std::string& prefix, + std::unordered_map<std::string, const FieldSchema*>* map) { + std::string current = prefix.empty() ? field.name : fmt::format("{}.{}", prefix, field.name); + if (field.children.empty()) { + (*map)[current] = &field; + } else { + for (const auto& child : field.children) { + build_path_map(child, current, map); + } + } +} + +} // namespace + +class ParquetMetadataReader::ModeHandler { +public: + explicit ModeHandler(RuntimeState* state) : _state(state) {} + virtual ~ModeHandler() = default; + + virtual void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) = 0; + virtual Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) = 0; + +protected: + RuntimeState* _state = nullptr; + + static std::unordered_map<std::string, int> _build_name_to_pos_map( + const std::vector<SlotDescriptor*>& slots) { + std::unordered_map<std::string, int> name_to_pos; + name_to_pos.reserve(slots.size()); + for (size_t i = 0; i < slots.size(); ++i) { + name_to_pos.emplace(to_lower(slots[i]->col_name()), static_cast<int>(i)); + } + return name_to_pos; + } + + template <size_t N> + static void _init_slot_pos_map(const std::unordered_map<std::string, int>& name_to_pos, + const std::array<const char*, N>& column_names, + std::array<int, N>* slot_pos) { + slot_pos->fill(-1); + for (size_t i = 0; i < column_names.size(); ++i) { + auto it = name_to_pos.find(column_names[i]); + if (it != name_to_pos.end()) { + (*slot_pos)[i] = it->second; + } + } + } +}; + +class ParquetSchemaModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetSchemaModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + std::unordered_map<std::string, int> name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kSchemaColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + RETURN_IF_ERROR(_append_schema_field(path, field, field.name, columns)); + } + return Status::OK(); + } + +private: + std::array<int, SCHEMA_COLUMN_COUNT> _slot_pos {}; + + Status _append_schema_field(const std::string& path, const FieldSchema& field, + const std::string& current_path, + std::vector<MutableColumnPtr>& columns) { + if (!field.children.empty()) { + for (const auto& child : field.children) { + std::string child_path = current_path.empty() + ? child.name + : fmt::format("{}.{}", current_path, child.name); + RETURN_IF_ERROR(_append_schema_field(path, child, child_path, columns)); + } + return Status::OK(); + } + + auto insert_if_requested = [&](SchemaColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(SCHEMA_FILE_NAME, insert_string, path); + insert_if_requested(SCHEMA_COLUMN_NAME, insert_string, field.name); + insert_if_requested(SCHEMA_COLUMN_PATH, insert_string, current_path); + insert_if_requested(SCHEMA_PHYSICAL_TYPE, insert_string, + physical_type_to_string(field.physical_type)); + insert_if_requested(SCHEMA_LOGICAL_TYPE, insert_string, + logical_type_to_string(field.parquet_schema)); + insert_if_requested(SCHEMA_REPETITION_LEVEL, insert_int32, field.repetition_level); + insert_if_requested(SCHEMA_DEFINITION_LEVEL, insert_int32, field.definition_level); + + int32_t type_length = field.parquet_schema.__isset.type_length ? field.parquet_schema.type_length + : 0; + insert_if_requested(SCHEMA_TYPE_LENGTH, insert_int32, type_length); + int32_t precision = + field.parquet_schema.__isset.precision ? field.parquet_schema.precision : 0; + insert_if_requested(SCHEMA_PRECISION, insert_int32, precision); + int32_t scale = field.parquet_schema.__isset.scale ? field.parquet_schema.scale : 0; + insert_if_requested(SCHEMA_SCALE, insert_int32, scale); + bool is_nullable_field = + !field.parquet_schema.__isset.repetition_type || + field.parquet_schema.repetition_type != tparquet::FieldRepetitionType::REQUIRED; + insert_if_requested(SCHEMA_IS_NULLABLE, insert_bool, is_nullable_field); + return Status::OK(); + } +}; + +class ParquetMetadataModeHandler final : public ParquetMetadataReader::ModeHandler { +public: + explicit ParquetMetadataModeHandler(RuntimeState* state) : ModeHandler(state) {} + + void init_slot_pos_map(const std::vector<SlotDescriptor*>& slots) override { + std::unordered_map<std::string, int> name_to_pos = _build_name_to_pos_map(slots); + _init_slot_pos_map(name_to_pos, kMetadataColumnNames, &_slot_pos); + } + + Status append_rows(const std::string& path, FileMetaData* metadata, + std::vector<MutableColumnPtr>& columns) override { + const tparquet::FileMetaData& thrift_meta = metadata->to_thrift(); + if (thrift_meta.row_groups.empty()) { + return Status::OK(); + } + + std::unordered_map<std::string, const FieldSchema*> path_map; + const auto& fields = metadata->schema().get_fields_schema(); + for (const auto& field : fields) { + build_path_map(field, "", &path_map); + } + + for (size_t rg_index = 0; rg_index < thrift_meta.row_groups.size(); ++rg_index) { + const auto& row_group = thrift_meta.row_groups[rg_index]; + for (size_t col_idx = 0; col_idx < row_group.columns.size(); ++col_idx) { + const auto& column_chunk = row_group.columns[col_idx]; + if (!column_chunk.__isset.meta_data) { + continue; + } + const auto& column_meta = column_chunk.meta_data; + std::string column_path = join_path(column_meta.path_in_schema); + const FieldSchema* schema_field = nullptr; + auto it = path_map.find(column_path); + if (it != path_map.end()) { + schema_field = it->second; + } + + auto insert_if_requested = + [&](MetadataColumnIndex idx, auto&& inserter, auto&&... args) { + int pos = _slot_pos[idx]; + if (pos >= 0) { + inserter(columns[pos], std::forward<decltype(args)>(args)...); + } + }; + + insert_if_requested(META_FILE_NAME, insert_string, + column_chunk.__isset.file_path ? column_chunk.file_path + : path); + insert_if_requested(META_ROW_GROUP_ID, insert_int32, + static_cast<Int32>(rg_index)); + insert_if_requested(META_COLUMN_ID, insert_int32, static_cast<Int32>(col_idx)); + std::string column_name = column_meta.path_in_schema.empty() + ? "" + : column_meta.path_in_schema.back(); + insert_if_requested(META_COLUMN_NAME, insert_string, column_name); + insert_if_requested(META_COLUMN_PATH, insert_string, column_path); + insert_if_requested(META_PHYSICAL_TYPE, insert_string, + physical_type_to_string(column_meta.type)); + + if (schema_field != nullptr) { + insert_if_requested(META_LOGICAL_TYPE, insert_string, + logical_type_to_string(schema_field->parquet_schema)); + int32_t type_length = schema_field->parquet_schema.__isset.type_length + ? schema_field->parquet_schema.type_length + : 0; + insert_if_requested(META_TYPE_LENGTH, insert_int32, type_length); + if (schema_field->parquet_schema.__isset.converted_type) { + insert_if_requested(META_CONVERTED_TYPE, insert_string, + converted_type_to_string( + schema_field->parquet_schema.converted_type)); + } else { + insert_if_requested(META_CONVERTED_TYPE, insert_string, ""); + } + } else { + insert_if_requested(META_LOGICAL_TYPE, insert_string, ""); + insert_if_requested(META_TYPE_LENGTH, insert_int32, 0); + insert_if_requested(META_CONVERTED_TYPE, insert_string, ""); + } + + insert_if_requested(META_NUM_VALUES, insert_int64, column_meta.num_values); + if (column_meta.__isset.statistics && + column_meta.statistics.__isset.null_count) { + insert_if_requested(META_NULL_COUNT, insert_int64, + column_meta.statistics.null_count); + } else { + insert_if_requested(META_NULL_COUNT, insert_null); + } + if (column_meta.__isset.statistics && + column_meta.statistics.__isset.distinct_count) { + insert_if_requested(META_DISTINCT_COUNT, insert_int64, + column_meta.statistics.distinct_count); + } else { + insert_if_requested(META_DISTINCT_COUNT, insert_null); + } + + insert_if_requested(META_ENCODINGS, insert_string, + encodings_to_string(column_meta.encodings)); + insert_if_requested(META_COMPRESSION, insert_string, + compression_to_string(column_meta.codec)); + insert_if_requested(META_DATA_PAGE_OFFSET, insert_int64, + column_meta.data_page_offset); + if (column_meta.__isset.index_page_offset) { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_int64, + column_meta.index_page_offset); + } else { + insert_if_requested(META_INDEX_PAGE_OFFSET, insert_null); + } + if (column_meta.__isset.dictionary_page_offset) { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_int64, + column_meta.dictionary_page_offset); + } else { + insert_if_requested(META_DICTIONARY_PAGE_OFFSET, insert_null); + } + insert_if_requested(META_TOTAL_COMPRESSED_SIZE, insert_int64, + column_meta.total_compressed_size); + insert_if_requested(META_TOTAL_UNCOMPRESSED_SIZE, insert_int64, + column_meta.total_uncompressed_size); + + if (column_meta.__isset.statistics) { + static const cctz::time_zone kUtc0 = cctz::utc_time_zone(); + const cctz::time_zone& ctz = _state != nullptr ? _state->timezone_obj() : kUtc0; + + std::string min_value; + std::string max_value; + bool has_min = try_get_statistics_encoded_value( + column_meta.statistics, true, &min_value); + bool has_max = try_get_statistics_encoded_value( + column_meta.statistics, false, &max_value); + + if (has_min) { + std::string decoded = + decode_statistics_value(schema_field, column_meta.type, min_value, ctz); + insert_if_requested(META_STATISTICS_MIN, insert_string, decoded); + } else { + insert_if_requested(META_STATISTICS_MIN, insert_null); + } + if (has_max) { + std::string decoded = + decode_statistics_value(schema_field, column_meta.type, max_value, ctz); + insert_if_requested(META_STATISTICS_MAX, insert_string, decoded); + } else { + insert_if_requested(META_STATISTICS_MAX, insert_null); + } + } else { + insert_if_requested(META_STATISTICS_MIN, insert_null); + insert_if_requested(META_STATISTICS_MAX, insert_null); + } + } + } + return Status::OK(); + } + +private: + std::array<int, META_COLUMN_COUNT> _slot_pos {}; +}; + +ParquetMetadataReader::ParquetMetadataReader(std::vector<SlotDescriptor*> slots, + RuntimeState* state, RuntimeProfile* profile, + TMetaScanRange scan_range) + : _state(state), _slots(std::move(slots)), _scan_range(std::move(scan_range)) { + (void)profile; +} + +ParquetMetadataReader::~ParquetMetadataReader() = default; + +Status ParquetMetadataReader::init_reader() { + RETURN_IF_ERROR(_init_from_scan_range(_scan_range)); + if (_mode_type == Mode::SCHEMA) { + _mode_handler = std::make_unique<ParquetSchemaModeHandler>(_state); + } else { + _mode_handler = std::make_unique<ParquetMetadataModeHandler>(_state); + } + _mode_handler->init_slot_pos_map(_slots); + return Status::OK(); +} + +Status ParquetMetadataReader::_init_from_scan_range(const TMetaScanRange& scan_range) { + if (!scan_range.__isset.parquet_params) { + return Status::InvalidArgument( + "Missing parquet parameters for parquet_metadata table function"); + } + const TParquetMetadataParams& params = scan_range.parquet_params; + if (!params.__isset.paths || params.paths.empty()) { + return Status::InvalidArgument("Property 'path' must be set for parquet_metadata"); Review Comment: Error messages in the backend reference "parquet_metadata" instead of the actual function name "parquet_meta". This is inconsistent with the function name defined on the frontend (NAME = "parquet_meta"). Update these error messages to use "parquet_meta" for consistency. ```suggestion "Missing parquet parameters for parquet_meta table function"); } const TParquetMetadataParams& params = scan_range.parquet_params; if (!params.__isset.paths || params.paths.empty()) { return Status::InvalidArgument("Property 'path' must be set for parquet_meta"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
