[ 
https://issues.apache.org/jira/browse/FLINK-10915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fengge updated FLINK-10915:
---------------------------
    Description: 
{code:java}
//代码占位符
{code}
(defn -main [& args] (let [flink-env 
(StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing 
flink-env 13000) sources (.addSource flink-env (RocketMQSource. 
(SimpleKeyValueDeserializationSchema. "msgid" "body") 
(gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism 
sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3) 
_ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print ednds) ;counts 
(.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2) 
;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 
2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) 
;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute 
flink-env"rocketmq-flink-feng2") ) )

  was:
```

(defn -main [& args]
 (let [flink-env (StreamExecutionEnvironment/getExecutionEnvironment)
 _ (.enableCheckpointing flink-env 13000)
 sources (.addSource flink-env
 (RocketMQSource. (SimpleKeyValueDeserializationSchema. "msgid" "body") 
 (gen-consumer-properties)))
 _ (.name sources "ririri")
 _ (.setParallelism sources 1)
 ednds (.returns (.flatMap sources (Cflatmapfunction.)) CloudTuple3)
 _ (.name ednds "ccc")
 _ (.setParallelism ednds 1)
 _ (.print ednds)
 ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 10)) 2)
 ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) (Time/minutes 
2)) 2)
 ]
 (prn "开始有状态的流式计算1" flink-env)
 ;(.setParallelism ds 1)
 ;(.setParallelism ednds 1)
 ;(.print counts)
 ;(.print secondcounts)
 (.execute flink-env"rocketmq-flink-feng2")
 )
 )

```


> clojure   context.collectWithTimestamp  Will be blocked.
> --------------------------------------------------------
>
>                 Key: FLINK-10915
>                 URL: https://issues.apache.org/jira/browse/FLINK-10915
>             Project: Flink
>          Issue Type: Improvement
>          Components: Client
>    Affects Versions: 1.6.2
>            Reporter: fengge
>            Priority: Minor
>
> {code:java}
> //代码占位符
> {code}
> (defn -main [& args] (let [flink-env 
> (StreamExecutionEnvironment/getExecutionEnvironment) _ (.enableCheckpointing 
> flink-env 13000) sources (.addSource flink-env (RocketMQSource. 
> (SimpleKeyValueDeserializationSchema. "msgid" "body") 
> (gen-consumer-properties))) _ (.name sources "ririri") _ (.setParallelism 
> sources 1) ednds (.returns (.flatMap sources (Cflatmapfunction.)) 
> CloudTuple3) _ (.name ednds "ccc") _ (.setParallelism ednds 1) _ (.print 
> ednds) ;counts (.sum (.timeWindow(.keyBy ednds (int-array [0])) (Time/seconds 
> 10)) 2) ;secondcounts (.sum (.timeWindow(.keyBy counts (int-array [0])) 
> (Time/minutes 2)) 2) ] (prn "开始有状态的流式计算1" flink-env) ;(.setParallelism ds 1) 
> ;(.setParallelism ednds 1) ;(.print counts) ;(.print secondcounts) (.execute 
> flink-env"rocketmq-flink-feng2") ) )



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to