[
https://issues.apache.org/jira/browse/FLINK-29729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691000#comment-17691000
]
dalongliu commented on FLINK-29729:
-----------------------------------
Sorry for responding later, after taking a deep analysis of the Flink
filesystem mechanism, I think we can't support configuring authentication
credentials at the job level currently, the filesystem is cached at the process
level. So we only can configure the authentication credentials in
flink-conf.yaml or Hadoop core-site.xml, other stores such as Hudi currently
only support configuring authentication information in the hadoop core-site.xml
file.
_As the issue description, we can't get Hadoop-related authentication
information from flink-conf.yaml when create {{ParquetReader}} using
S3AFileSystem in TM, but in JM, {{SourceCoordinator}} works well when
getFileStatus from s3a. After taking a deep analysis of the source code, I
found that on the JM side, we use Flink's own FileSystem to get
{{{}FileStatus{}}}, which can get the authentication information from
flink-conf.yaml; but on the TM side, we use Hadoop's own FileSystem to create
footer, which can't get the authentication information, which is an
inconsistent behavior and causes the problem of the issue description. I think
this is bug, so we should use Flink's FileSystem on both JM side and TM side._
> Support including the configured properties from flink-conf.yaml during
> create ParquetReader
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-29729
> URL: https://issues.apache.org/jira/browse/FLINK-29729
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Reporter: Rascal Wu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2022-10-22-17-41-38-084.png
>
>
> Hi, I'm thinking if we can include the configured properties from
> flink-conf.yaml during create ParquetReader in `ParquetVectorizedInputformat`
> besides hadoop configuration.
>
> I meet a use case that I want to query a table from S3 bucket with parquet
> format via filesystem connector, and I configured the AWS credential info in
> the `flink-conf.yaml`, e.g. fs.s3a.access.key, fs.s3a.secret.key, etc.
>
> The JobManager(SourceCoordinator) works well about "getFileStatus" of S3
> objects and generate splits, but TaskManager(SourceOperator ->
> ParquetVectorizedInputFormat -> ParquetReader) doesn't work since missing AWS
> credential info.
>
> After taking a deep analysis at the source code about creating ParquetReader
> to reader footer, I found that the AWS credential info is not passed during
> create & initialize S3AFileSystem, the detail info as showing in the bellow
> snapshot. !image-2022-10-22-17-41-38-084.png!
>
> The `hadoopConfig` only contains the properties from table format options and
> default hadoop properties from core-site.xml, hdfs-site.xml and etc. Because
> the `hadoopConfig` is injected by
> `ParquetFileFormatFactory#createRuntimeDecoder` ->
> `ParquetColumnarRowInputFormat.createPartitionedFormat` ->
> `ParquetFileFormatFactory.generateParquetConfiguration`
>
> {code:java}
> @Override
> public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
> DynamicTableSource.Context sourceContext,
> DataType producedDataType,
> int[][] projections) {
> return ParquetColumnarRowInputFormat.createPartitionedFormat(
> getParquetConfiguration(formatOptions),
> (RowType)
> Projection.of(projections).project(producedDataType).getLogicalType(),
> sourceContext.createTypeInformation(producedDataType),
> Collections.emptyList(),
> null,
> VectorizedColumnBatch.DEFAULT_SIZE,
> formatOptions.get(UTC_TIMEZONE),
> true);
> }
>
> private static Configuration getParquetConfiguration(ReadableConfig options) {
> Configuration conf = new Configuration();
> Properties properties = new Properties();
> ((org.apache.flink.configuration.Configuration)
> options).addAllToProperties(properties);
> properties.forEach((k, v) -> conf.set(IDENTIFIER + "." + k, v.toString()));
> return conf;
> }
> {code}
>
> I know that I can add the AWS credential info into core-site.xml or
> hdfs-site.xml, so that the `ParquetReader` can get the credential, but I
> think it might not a good practice, especially different flink jobs will use
> different AWS credential, so I'm thinking if we can combine the default
> hadoop configuration(static) and the properties from
> `flink-conf.yaml`(dynamic) during create `ParquetReader`.
> For example, just like how this PR doing?
> https://github.com/apache/flink/pull/21130
>
> BTW, I'm using Flink 1.15.1 in a standalone cluster to validate the whole
> process, but I think not only 1.15.1 version meet this problem, and not only
> access the objects/files from AWS S3 bucket, any other cloud object storage
> might also meet this problem.
>
> Besides change the code, is there any other solution can help me to handle
> this problem? thanks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)