[
https://issues.apache.org/jira/browse/FLINK-20201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lqjacklee updated FLINK-20201:
------------------------------
Description:
{code:java}
static Map<String, Field> fieldMap;
static {
fieldMap = Stream.of(Entity.class.getDeclaredFields())
.collect(Collectors.toMap(Field::getName, field -> field));
}
public static class Entity extends PojoTypeInfo<Entity> implements
Comparable<Entity> {
public String name;
public long currentDate;
public int purchaseVolume;
public Entity(String name, int purchaseVolume) {
super(Entity.class, Arrays.asList(new
PojoField(fieldMap.get("name"), BasicTypeInfo.STRING_TYPE_INFO),
new PojoField(fieldMap.get("purchaseVolume"),
BasicTypeInfo.INT_TYPE_INFO),
new PojoField(fieldMap.get("currentDate"),
BasicTypeInfo.DATE_TYPE_INFO)
));
this.name = name;
this.purchaseVolume = purchaseVolume;
this.currentDate = System.currentTimeMillis();
}
@Override
public int compareTo(Entity o) {
return Long.compare(currentDate, o.currentDate);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getCurrentDate() {
return currentDate;
}
public void setCurrentDate(long currentDate) {
this.currentDate = currentDate;
}
public int getPurchaseVolume() {
return purchaseVolume;
}
public void setPurchaseVolume(int purchaseVolume) {
this.purchaseVolume = purchaseVolume;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(new Entity("jack", 10), new Entity("tom", 20), new
Entity("jack", 30))
.keyBy((KeySelector<Entity, String>) value -> value.name,
BasicTypeInfo.STRING_TYPE_INFO)
.timeWindow(Time.milliseconds(1))
.process(new ProcessWindowFunction<Entity, Integer, String,
TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Entity>
elements, Collector<Integer> out) throws Exception {
StreamSupport.stream(elements.spliterator(), false)
.map(Entity::getPurchaseVolume)
.iterator()
.forEachRemaining(out::collect);
}
})
.print();
env.execute("window aggregate");
}
{code}
I provide the sample code to show how to use the window parameter in the Flink.
we want to provide the function to adjust the size of the window parameter.
Because the speed of data generation changes greatly, we hope to dynamically
adjust the size of the window according to different data volume.
The window parameter should be generated by machine learning. in that case, we
can just provide the window time range to user to configure.
was:
{code:java}
static Map<String, Field> fieldMap;
static {
fieldMap = Stream.of(Entity.class.getDeclaredFields())
.collect(Collectors.toMap(Field::getName, field -> field));
}
public static class Entity extends PojoTypeInfo<Entity> implements
Comparable<Entity> {
public String name;
public long currentDate;
public int purchaseVolume;
public Entity(String name, int purchaseVolume) {
super(Entity.class, Arrays.asList(new
PojoField(fieldMap.get("name"), BasicTypeInfo.STRING_TYPE_INFO),
new PojoField(fieldMap.get("purchaseVolume"),
BasicTypeInfo.INT_TYPE_INFO),
new PojoField(fieldMap.get("currentDate"),
BasicTypeInfo.DATE_TYPE_INFO)
));
this.name = name;
this.purchaseVolume = purchaseVolume;
this.currentDate = System.currentTimeMillis();
}
@Override
public int compareTo(Entity o) {
return Long.compare(currentDate, o.currentDate);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getCurrentDate() {
return currentDate;
}
public void setCurrentDate(long currentDate) {
this.currentDate = currentDate;
}
public int getPurchaseVolume() {
return purchaseVolume;
}
public void setPurchaseVolume(int purchaseVolume) {
this.purchaseVolume = purchaseVolume;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(new Entity("jack", 10), new Entity("tom", 20), new
Entity("jack", 30))
.keyBy((KeySelector<Entity, String>) value -> value.name,
BasicTypeInfo.STRING_TYPE_INFO)
.timeWindow(Time.milliseconds(1))
.process(new ProcessWindowFunction<Entity, Integer, String,
TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Entity>
elements, Collector<Integer> out) throws Exception {
StreamSupport.stream(elements.spliterator(), false)
.map(Entity::getPurchaseVolume)
.iterator()
.forEachRemaining(out::collect);
}
})
.print();
env.execute("window aggregate");
}
{code}
Because the speed of data generation changes greatly, we hope to dynamically
adjust the size of the window according to different data volume.
The window parameter should be generated by machine learning. in that case, we
can just provide the window time range to user to configure.
> Support automatic adjustment of window parameters
> -------------------------------------------------
>
> Key: FLINK-20201
> URL: https://issues.apache.org/jira/browse/FLINK-20201
> Project: Flink
> Issue Type: Wish
> Components: Runtime / Coordination
> Affects Versions: 1.11.2
> Reporter: lqjacklee
> Priority: Major
>
> {code:java}
> static Map<String, Field> fieldMap;
> static {
> fieldMap = Stream.of(Entity.class.getDeclaredFields())
> .collect(Collectors.toMap(Field::getName, field -> field));
> }
> public static class Entity extends PojoTypeInfo<Entity> implements
> Comparable<Entity> {
> public String name;
> public long currentDate;
> public int purchaseVolume;
> public Entity(String name, int purchaseVolume) {
> super(Entity.class, Arrays.asList(new
> PojoField(fieldMap.get("name"), BasicTypeInfo.STRING_TYPE_INFO),
> new PojoField(fieldMap.get("purchaseVolume"),
> BasicTypeInfo.INT_TYPE_INFO),
> new PojoField(fieldMap.get("currentDate"),
> BasicTypeInfo.DATE_TYPE_INFO)
> ));
> this.name = name;
> this.purchaseVolume = purchaseVolume;
> this.currentDate = System.currentTimeMillis();
> }
> @Override
> public int compareTo(Entity o) {
> return Long.compare(currentDate, o.currentDate);
> }
> public String getName() {
> return name;
> }
> public void setName(String name) {
> this.name = name;
> }
> public long getCurrentDate() {
> return currentDate;
> }
> public void setCurrentDate(long currentDate) {
> this.currentDate = currentDate;
> }
> public int getPurchaseVolume() {
> return purchaseVolume;
> }
> public void setPurchaseVolume(int purchaseVolume) {
> this.purchaseVolume = purchaseVolume;
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
> .fromElements(new Entity("jack", 10), new Entity("tom", 20), new
> Entity("jack", 30))
> .keyBy((KeySelector<Entity, String>) value -> value.name,
> BasicTypeInfo.STRING_TYPE_INFO)
> .timeWindow(Time.milliseconds(1))
> .process(new ProcessWindowFunction<Entity, Integer, String,
> TimeWindow>() {
> @Override
> public void process(String s, Context context,
> Iterable<Entity> elements, Collector<Integer> out) throws Exception {
> StreamSupport.stream(elements.spliterator(), false)
> .map(Entity::getPurchaseVolume)
> .iterator()
> .forEachRemaining(out::collect);
> }
> })
> .print();
> env.execute("window aggregate");
> }
> {code}
> I provide the sample code to show how to use the window parameter in the
> Flink. we want to provide the function to adjust the size of the window
> parameter.
> Because the speed of data generation changes greatly, we hope to dynamically
> adjust the size of the window according to different data volume.
> The window parameter should be generated by machine learning. in that case,
> we can just provide the window time range to user to configure.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)