This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 42a5d6cb1fbd8a584e023546e73730055422f1e9 Author: vongosling <[email protected]> AuthorDate: Mon Jun 3 11:36:15 2019 +0800 Verify flink prs --- .../org/apache/rocketmq/flink/RocketMQSource.java | 2 -- .../example}/example/RocketMQFlinkExample.java | 27 +++++++++--------- .../flink/example/example/SimpleConsumer.java} | 21 +++++++------- .../flink/example/example/SimpleProducer.java} | 33 ++++++++-------------- style/rmq_checkstyle.xml | 2 ++ 5 files changed, 37 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index ccd6bb4..14c479b 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -318,8 +318,6 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> restoredOffsets = new ConcurrentHashMap<>(); } for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) { - // unionOffsetStates is the restored global union state; - // should only snapshot mqs that actually belong to us if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) { restoredOffsets.put(mqOffsets.f0, mqOffsets.f1); } diff --git a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java b/src/main/java/org/apache/rocketmq/flink/example/example/RocketMQFlinkExample.java similarity index 81% rename from src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java rename to src/main/java/org/apache/rocketmq/flink/example/example/RocketMQFlinkExample.java index f4f654e..92b8dbf 100644 --- a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java +++ b/src/main/java/org/apache/rocketmq/flink/example/example/RocketMQFlinkExample.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.flink.example; +package org.apache.rocketmq.flink.example.example; import java.util.HashMap; import java.util.Map; @@ -36,7 +35,6 @@ public class RocketMQFlinkExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // enable checkpoint env.enableCheckpointing(3000); Properties consumerProps = new Properties(); @@ -47,7 +45,7 @@ public class RocketMQFlinkExample { Properties producerProps = new Properties(); producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876"); int msgDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL05; - producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL,String.valueOf(msgDelayLevel)); + producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL, String.valueOf(msgDelayLevel)); // TimeDelayLevel is not supported for batching boolean batchFlag = msgDelayLevel <= 0; @@ -60,7 +58,7 @@ public class RocketMQFlinkExample { HashMap result = new HashMap(); result.put("id", in.get("id")); String[] arr = in.get("address").toString().split("\\s+"); - result.put("province", arr[arr.length-1]); + result.put("province", arr[arr.length - 1]); out.collect(result); } }) @@ -73,7 +71,8 @@ public class RocketMQFlinkExample { try { env.execute("rocketmq-flink-example"); - } catch (Exception e) { + } + catch (Exception e) { e.printStackTrace(); } } diff --git a/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java b/src/main/java/org/apache/rocketmq/flink/example/example/SimpleConsumer.java similarity index 75% rename from src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java rename to src/main/java/org/apache/rocketmq/flink/example/example/SimpleConsumer.java index 1b07b8d..c087513 100644 --- a/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java +++ b/src/main/java/org/apache/rocketmq/flink/example/example/SimpleConsumer.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.flink.example; +package org.apache.rocketmq.flink.example.example; import java.util.List; @@ -27,7 +26,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; -public class ConsumerTest { +public class SimpleConsumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g00003"); consumer.setNamesrvAddr("localhost:9876"); diff --git a/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java b/src/main/java/org/apache/rocketmq/flink/example/example/SimpleProducer.java similarity index 52% rename from src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java rename to src/main/java/org/apache/rocketmq/flink/example/example/SimpleProducer.java index c04ca74..5a6b572 100644 --- a/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java +++ b/src/main/java/org/apache/rocketmq/flink/example/example/SimpleProducer.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,15 +15,13 @@ * limitations under the License. */ -package org.apache.rocketmq.flink.example; +package org.apache.rocketmq.flink.example.example; -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.exception.RemotingException; -public class ProducerTest { +public class SimpleProducer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("p001"); producer.setNamesrvAddr("localhost:9876"); @@ -34,16 +31,10 @@ public class ProducerTest { e.printStackTrace(); } for (int i = 0; i < 10000; i++) { - Message msg = new Message("flink-source2" , "", "id_"+i, ("country_X province_" + i).getBytes()); + Message msg = new Message("flink-source2", "", "id_" + i, ("country_X province_" + i).getBytes()); try { producer.send(msg); - } catch (MQClientException e) { - e.printStackTrace(); - } catch (RemotingException e) { - e.printStackTrace(); - } catch (MQBrokerException e) { - e.printStackTrace(); - } catch (InterruptedException e) { + } catch (Exception e) { e.printStackTrace(); } System.out.println("send " + i); diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml index e3155cc..d5d591d 100644 --- a/style/rmq_checkstyle.xml +++ b/style/rmq_checkstyle.xml @@ -32,10 +32,12 @@ <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/> </module> + <!-- <module name="RegexpSingleline"> <property name="format" value="System\.out\.println"/> <property name="message" value="Prohibit invoking System.out.println in source code !"/> </module> + --> <module name="RegexpSingleline"> <property name="format" value="//FIXME"/>
