[
https://issues.apache.org/jira/browse/FLINK-20201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Till Rohrmann updated FLINK-20201:
----------------------------------
Component/s: (was: Runtime / Coordination)
API / DataStream
> Support automatic adjustment of window parameters
> -------------------------------------------------
>
> Key: FLINK-20201
> URL: https://issues.apache.org/jira/browse/FLINK-20201
> Project: Flink
> Issue Type: Wish
> Components: API / DataStream
> Affects Versions: 1.11.2
> Reporter: lqjacklee
> Priority: Major
>
> {code:java}
> static Map<String, Field> fieldMap;
> static {
> fieldMap = Stream.of(Entity.class.getDeclaredFields())
> .collect(Collectors.toMap(Field::getName, field -> field));
> }
> public static class Entity extends PojoTypeInfo<Entity> implements
> Comparable<Entity> {
> public String name;
> public long currentDate;
> public int purchaseVolume;
> public Entity(String name, int purchaseVolume) {
> super(Entity.class, Arrays.asList(new
> PojoField(fieldMap.get("name"), BasicTypeInfo.STRING_TYPE_INFO),
> new PojoField(fieldMap.get("purchaseVolume"),
> BasicTypeInfo.INT_TYPE_INFO),
> new PojoField(fieldMap.get("currentDate"),
> BasicTypeInfo.DATE_TYPE_INFO)
> ));
> this.name = name;
> this.purchaseVolume = purchaseVolume;
> this.currentDate = System.currentTimeMillis();
> }
> @Override
> public int compareTo(Entity o) {
> return Long.compare(currentDate, o.currentDate);
> }
> public String getName() {
> return name;
> }
> public void setName(String name) {
> this.name = name;
> }
> public long getCurrentDate() {
> return currentDate;
> }
> public void setCurrentDate(long currentDate) {
> this.currentDate = currentDate;
> }
> public int getPurchaseVolume() {
> return purchaseVolume;
> }
> public void setPurchaseVolume(int purchaseVolume) {
> this.purchaseVolume = purchaseVolume;
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
> .fromElements(new Entity("jack", 10), new Entity("tom", 20), new
> Entity("jack", 30))
> .keyBy((KeySelector<Entity, String>) value -> value.name,
> BasicTypeInfo.STRING_TYPE_INFO)
> .timeWindow(Time.milliseconds(1))
> .process(new ProcessWindowFunction<Entity, Integer, String,
> TimeWindow>() {
> @Override
> public void process(String s, Context context,
> Iterable<Entity> elements, Collector<Integer> out) throws Exception {
> StreamSupport.stream(elements.spliterator(), false)
> .map(Entity::getPurchaseVolume)
> .iterator()
> .forEachRemaining(out::collect);
> }
> })
> .print();
> env.execute("window aggregate");
> }
> {code}
> I provide the sample code to show how to use the window parameter in the
> Flink. we want to provide the function to adjust the size of the window
> parameter.
> Because the speed of data generation changes greatly, we hope to dynamically
> adjust the size of the window according to different data volume.
> The window parameter should be generated by machine learning. in that case,
> we can just provide the window time range to user to configure.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)