[ https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
fengge updated FLINK-10915: --------------------------- Description: {code:java} //代码占位符 {code} (defn -main [& args] (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing flink-env 13000) sources (.addSource flink-env (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") (gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3) _ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print ednds) ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2) ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) ;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute flink-env"rocketmq-flink-feng2") ) ) was: ``` (defn -main [& args] (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing flink-env 13000) sources (.addSource flink-env (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") (gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3) _ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print ednds) ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2) ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) ;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute flink-env"rocketmq-flink-feng2") ) ) ``` > clojure context.collectWithTimestamp Will be blocked. > -------------------------------------------------------- > > Key: FLINK-10915 > URL: https://issues.apache.org/jira/browse/FLINK-10915 > Project: Flink > Issue Type: Improvement > Components: Client > Affects Versions: 1.6.2 > Reporter: fengge > Priority: Minor > > {code:java} > //代码占位符 > {code} > (defn -main [& args] (let [flink-env > (StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing > flink-env 13000) sources (.addSource flink-env (RocketMQSource. > (SimpleKeyValueDeserializationSchema. "msgid" "body") > (gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism > sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) > CloudTuple3) _ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print > ednds) ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds > 10)) 2) ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) > (Time/minutes 2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) > ;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute > flink-env"rocketmq-flink-feng2") ) ) -- This message was sent by Atlassian JIRA (v7.6.3#76005)