[
https://issues.apache.org/jira/browse/FLINK-20201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-20201:
-----------------------------------
Labels: auto-deprioritized-major stale-minor (was:
auto-deprioritized-major)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Support automatic adjustment of window parameters
> -------------------------------------------------
>
> Key: FLINK-20201
> URL: https://issues.apache.org/jira/browse/FLINK-20201
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.11.2
> Reporter: lqjacklee
> Priority: Minor
> Labels: auto-deprioritized-major, stale-minor
>
> {code:java}
> static Map<String, Field> fieldMap;
> static {
> fieldMap = Stream.of(Entity.class.getDeclaredFields())
> .collect(Collectors.toMap(Field::getName, field -> field));
> }
> public static class Entity extends PojoTypeInfo<Entity> implements
> Comparable<Entity> {
> public String name;
> public long currentDate;
> public int purchaseVolume;
> public Entity(String name, int purchaseVolume) {
> super(Entity.class, Arrays.asList(new
> PojoField(fieldMap.get("name"), BasicTypeInfo.STRING_TYPE_INFO),
> new PojoField(fieldMap.get("purchaseVolume"),
> BasicTypeInfo.INT_TYPE_INFO),
> new PojoField(fieldMap.get("currentDate"),
> BasicTypeInfo.DATE_TYPE_INFO)
> ));
> this.name = name;
> this.purchaseVolume = purchaseVolume;
> this.currentDate = System.currentTimeMillis();
> }
> @Override
> public int compareTo(Entity o) {
> return Long.compare(currentDate, o.currentDate);
> }
> public String getName() {
> return name;
> }
> public void setName(String name) {
> this.name = name;
> }
> public long getCurrentDate() {
> return currentDate;
> }
> public void setCurrentDate(long currentDate) {
> this.currentDate = currentDate;
> }
> public int getPurchaseVolume() {
> return purchaseVolume;
> }
> public void setPurchaseVolume(int purchaseVolume) {
> this.purchaseVolume = purchaseVolume;
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
> .fromElements(new Entity("jack", 10), new Entity("tom", 20), new
> Entity("jack", 30))
> .keyBy((KeySelector<Entity, String>) value -> value.name,
> BasicTypeInfo.STRING_TYPE_INFO)
> .timeWindow(Time.milliseconds(1))
> .process(new ProcessWindowFunction<Entity, Integer, String,
> TimeWindow>() {
> @Override
> public void process(String s, Context context,
> Iterable<Entity> elements, Collector<Integer> out) throws Exception {
> StreamSupport.stream(elements.spliterator(), false)
> .map(Entity::getPurchaseVolume)
> .iterator()
> .forEachRemaining(out::collect);
> }
> })
> .print();
> env.execute("window aggregate");
> }
> {code}
> I provide the sample code to show how to use the window parameter in the
> Flink. we want to provide the function to adjust the size of the window
> parameter.
> Because the speed of data generation changes greatly, we hope to dynamically
> adjust the size of the window according to different data volume.
> The window parameter should be generated by machine learning. in that case,
> we can just provide the window time range to user to configure.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)