[
https://issues.apache.org/jira/browse/FLINK-4101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vasia Kalavri closed FLINK-4101.
--------------------------------
Resolution: Invalid
> Calculating average in Flink DataStream on window time
> ------------------------------------------------------
>
> Key: FLINK-4101
> URL: https://issues.apache.org/jira/browse/FLINK-4101
> Project: Flink
> Issue Type: Task
> Components: DataStream API
> Affects Versions: 1.0.2
> Reporter: Akshay Shingote
>
> I am using Flink DataStream API where there where racks are available & I
> want to calculate "average"of temperature group by rack IDs. My window
> duration is of 40 seconds & my window is sliding every 10 seconds...Following
> is my code where I am calculating sum of temperatures every 10 seconds for
> every rackID,but now I want to calculate average temperatures::
> static Properties properties=new Properties();
> public static Properties getProperties()
> {
> properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
> properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
> //properties.setProperty("deserializer.class",
> "kafka.serializer.StringEncoder");
> //properties.setProperty("group.id", "akshay");
> properties.setProperty("auto.offset.reset", "earliest");
> return properties;
> }
> @SuppressWarnings("rawtypes")
> public static void main(String[] args) throws Exception
> {
> StreamExecutionEnvironment
> env=StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> Properties props=Program.getProperties();
> DataStream<TemperatureEvent> dstream=env.addSource(new
> FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new
> TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new
> IngestionTimeExtractor<>());
> DataStream<TemperatureEvent>
> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40),
> Time.seconds(10)).sum("temperature");
> env.execute("Temperature Consumer");
> }
> How can I calcluate average temperature for the above example ??
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)