[ https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17458457#comment-17458457 ]
Timo Walther commented on FLINK-19589: -------------------------------------- Another request on this topic: https://lists.apache.org/thread/4f9y7soqql4zlh2lc31jv2vj7royy01g I also believe this makes a lot of sense which is why I increased the priority. > Support per-connector FileSystem configuration > ---------------------------------------------- > > Key: FLINK-19589 > URL: https://issues.apache.org/jira/browse/FLINK-19589 > Project: Flink > Issue Type: Improvement > Components: FileSystems > Affects Versions: 1.12.0 > Reporter: Padarn Wilson > Priority: Major > Labels: auto-deprioritized-minor, auto-unassigned > > Currently, options for file systems can only be configured globally. However, > in many cases, users would like to configure more fine-grained. > Either we allow a properties map similar to Kafka or Kinesis properties to > our connectors. > Or something like: > Management of two properties related S3 Object management: > - [Lifecycle configuration > |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html] > - [Object > tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm] > Being able to control these is useful for people who want to manage jobs > using S3 for checkpointing or job output, but need to control per job level > configuration of the tagging/lifecycle for the purposes of auditing or cost > control (for example deleting old state from S3) > Ideally, it would be possible to control this on each object being written by > Flink, or at least at a job level. > _Note_*:* Some related existing properties can be set using the hadoop module > using system properties: see for example > {code:java} > fs.s3a.acl.default{code} > which sets the default ACL on written objects. > *Solutions*: > 1) Modify hadoop module: > The above-linked module could be updated in order to have a new property (and > similar for lifecycle) > fs.s3a.tags.default > which could be a comma separated list of tags to set. For example > {code:java} > fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code} > This seems like a natural place to put this logic (and is outside of Flink if > we decide to go this way. However it does not allow for a sink and checkpoint > to have different values for these. > 2) Expose withTagging from module > The hadoop module used by Flink's existing filesystem has already exposed put > request level tagging (see > [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]). > This could be used in the Flink filesystem plugin to expose these options. A > possible approach could be to somehow incorporate it into the file path, e.g., > {code:java} > path = "TAGS:s3://bucket/path"{code} > Or possible as an option that can be applied to the checkpoint and sink > configurations, e.g., > {code:java} > env.getCheckpointingConfig().setS3Tags(TAGS) {code} > and similar for a file sink. > _Note_: The lifecycle can also be managed using the module: see > [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html]. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)