[
https://issues.apache.org/jira/browse/FLINK-29729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641045#comment-17641045
]
dalongliu edited comment on FLINK-29729 at 11/30/22 3:08 AM:
-------------------------------------------------------------
[~stayrascal] [~qinjunjerry] Sorry for the late response, I think we should
support configuring the ak/sk in flink-conf.yaml level and job level, the job
level has better priority. We read the ak/sk from table option first, if it
does not configure, then read from flink-conf.yaml. By this way, I think we can
support all cases of sk/ak requirements.
> From my understanding, if we want to configure the aws credential info at job
> level by format options, the filesystem connector format should support
> customized properties at first, right? And the cloud IAM credential
> info(AK/SK) should not couple with a generic filesystem connector options.
Yes, I think we can try to support it by customized format options in
filesystem connector, can you help to do a poc to verify whether is it feasible?
The hbase connector also support customer options which start with
'properties.', more detail for
[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/#connector-options].
was (Author: lsy):
[~stayrascal] [~qinjunjerry] Sorry for the late response, I think we should
support configuring the ak/sk in flink-conf.yaml level and job level, the job
level has better priority. We read the ak/sk from table option first, if it
does not configure, then read from flink-conf.yaml. By this way, I think we can
support all cases of sk/ak requirements.
> From my understanding, if we want to configure the aws credential info at job
> level by format options, the filesystem connector format should support
> customized properties at first, right? And the cloud IAM credential
> info(AK/SK) should not couple with a generic filesystem connector options.
Yes, I think we can try to support it by customized format options in
filesystem connector, can you help to do a poc to verify whether is it feasible?
The base connector also support customer options which start with
'properties.', more detail for
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/#connector-options.
> Support including the configured properties from flink-conf.yaml during
> create ParquetReader
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-29729
> URL: https://issues.apache.org/jira/browse/FLINK-29729
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Reporter: Zhiping Wu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
> Attachments: image-2022-10-22-17-41-38-084.png
>
>
> Hi, I'm thinking if we can include the configured properties from
> flink-conf.yaml during create ParquetReader in `ParquetVectorizedInputformat`
> besides hadoop configuration.
>
> I meet a use case that I want to query a table from S3 bucket with parquet
> format via filesystem connector, and I configured the AWS credential info in
> the `flink-conf.yaml`, e.g. fs.s3a.access.key, fs.s3a.secret.key, etc.
>
> The JobManager(SourceCoordinator) works well about "getFileStatus" of S3
> objects and generate splits, but TaskManager(SourceOperator ->
> ParquetVectorizedInputFormat -> ParquetReader) doesn't work since missing AWS
> credential info.
>
> After taking a deep analysis at the source code about creating ParquetReader
> to reader footer, I found that the AWS credential info is not passed during
> create & initialize S3AFileSystem, the detail info as showing in the bellow
> snapshot. !image-2022-10-22-17-41-38-084.png!
>
> The `hadoopConfig` only contains the properties from table format options and
> default hadoop properties from core-site.xml, hdfs-site.xml and etc. Because
> the `hadoopConfig` is injected by
> `ParquetFileFormatFactory#createRuntimeDecoder` ->
> `ParquetColumnarRowInputFormat.createPartitionedFormat` ->
> `ParquetFileFormatFactory.generateParquetConfiguration`
>
> {code:java}
> @Override
> public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
> DynamicTableSource.Context sourceContext,
> DataType producedDataType,
> int[][] projections) {
> return ParquetColumnarRowInputFormat.createPartitionedFormat(
> getParquetConfiguration(formatOptions),
> (RowType)
> Projection.of(projections).project(producedDataType).getLogicalType(),
> sourceContext.createTypeInformation(producedDataType),
> Collections.emptyList(),
> null,
> VectorizedColumnBatch.DEFAULT_SIZE,
> formatOptions.get(UTC_TIMEZONE),
> true);
> }
>
> private static Configuration getParquetConfiguration(ReadableConfig options) {
> Configuration conf = new Configuration();
> Properties properties = new Properties();
> ((org.apache.flink.configuration.Configuration)
> options).addAllToProperties(properties);
> properties.forEach((k, v) -> conf.set(IDENTIFIER + "." + k, v.toString()));
> return conf;
> }
> {code}
>
> I know that I can add the AWS credential info into core-site.xml or
> hdfs-site.xml, so that the `ParquetReader` can get the credential, but I
> think it might not a good practice, especially different flink jobs will use
> different AWS credential, so I'm thinking if we can combine the default
> hadoop configuration(static) and the properties from
> `flink-conf.yaml`(dynamic) during create `ParquetReader`.
> For example, just like how this PR doing?
> https://github.com/apache/flink/pull/21130
>
> BTW, I'm using Flink 1.15.1 in a standalone cluster to validate the whole
> process, but I think not only 1.15.1 version meet this problem, and not only
> access the objects/files from AWS S3 bucket, any other cloud object storage
> might also meet this problem.
>
> Besides change the code, is there any other solution can help me to handle
> this problem? thanks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)