Zhiping Wu created FLINK-29729:
----------------------------------

             Summary: Support including the configured properties from 
flink-conf.yaml during create ParquetReader
                 Key: FLINK-29729
                 URL: https://issues.apache.org/jira/browse/FLINK-29729
             Project: Flink
          Issue Type: Improvement
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
            Reporter: Zhiping Wu
         Attachments: image-2022-10-22-17-41-38-084.png

Hi, I'm thinking if we can include the configured properties from 
flink-conf.yaml during create ParquetReader in `ParquetVectorizedInputformat` 
besides hadoop configuration.

 

I meet a use case that I want to query a table from S3 bucket with parquet 
format via filesystem connector, and I configured the AWS credential info in 
the `flink-conf.yaml`, e.g. fs.s3a.access.key, fs.s3a.secret.key, etc. 

 

The JobManager(SourceCoordinator) works well about "getFileStatus" of S3 
objects and generate splits, but TaskManager(SourceOperator -> 
ParquetVectorizedInputFormat -> ParquetReader) doesn't work since missing AWS 
credential info.

 

After taking a deep analysis at the source code about creating ParquetReader to 
reader footer, I found that the AWS credential info is not passed during create 
& initialize S3AFileSystem, the detail info as showing in the bellow snapshot. 
!image-2022-10-22-17-41-38-084.png!

 

The `hadoopConfig` only contains the properties from table format options and 
default hadoop properties from core-site.xml, hdfs-site.xml and etc. Because 
the `hadoopConfig` is injected by 
`ParquetFileFormatFactory#createRuntimeDecoder` -> 
`ParquetColumnarRowInputFormat.createPartitionedFormat` -> 
`ParquetFileFormatFactory.generateParquetConfiguration`

 

{code}

@Override
public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
DynamicTableSource.Context sourceContext,
DataType producedDataType,
int[][] projections) {

return ParquetColumnarRowInputFormat.createPartitionedFormat(
getParquetConfiguration(formatOptions),
(RowType) Projection.of(projections).project(producedDataType).getLogicalType(),
sourceContext.createTypeInformation(producedDataType),
Collections.emptyList(),
null,
VectorizedColumnBatch.DEFAULT_SIZE,
formatOptions.get(UTC_TIMEZONE),
true);
}

 

private static Configuration getParquetConfiguration(ReadableConfig options) {
Configuration conf = new Configuration();
Properties properties = new Properties();
((org.apache.flink.configuration.Configuration) 
options).addAllToProperties(properties);
properties.forEach((k, v) -> conf.set(IDENTIFIER + "." + k, v.toString()));
return conf;
}

{code}

 

I know that I can add the AWS credential info into core-site.xml or 
hdfs-site.xml, so that the `ParquetReader` can get the credential, but I think 
it might not a good practice, especially different flink jobs will use 
different AWS credential, so I'm thinking if we can combine the default hadoop 
configuration(static) and the properties from `flink-conf.yaml`(dynamic) during 
create `ParquetReader`.

 

BTW,  I'm using Flink 1.15.1 in a standalone cluster to validate the whole 
process, but I think not only 1.15.1 version meet this problem, and not only 
access the objects/files from AWS S3 bucket, any other cloud object storage 
might also meet this problem.

 

Besides change the code, is there any other solution can help me to handle this 
problem? thanks. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to