This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 42a5d6cb1fbd8a584e023546e73730055422f1e9
Author: vongosling <[email protected]>
AuthorDate: Mon Jun 3 11:36:15 2019 +0800

    Verify flink prs
---
 .../org/apache/rocketmq/flink/RocketMQSource.java  |  2 --
 .../example}/example/RocketMQFlinkExample.java     | 27 +++++++++---------
 .../flink/example/example/SimpleConsumer.java}     | 21 +++++++-------
 .../flink/example/example/SimpleProducer.java}     | 33 ++++++++--------------
 style/rmq_checkstyle.xml                           |  2 ++
 5 files changed, 37 insertions(+), 48 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index ccd6bb4..14c479b 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -318,8 +318,6 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
                 restoredOffsets = new ConcurrentHashMap<>();
             }
             for (Tuple2<MessageQueue, Long> mqOffsets : 
unionOffsetStates.get()) {
-                // unionOffsetStates is the restored global union state;
-                // should only snapshot mqs that actually belong to us
                 if (!restoredOffsets.containsKey(mqOffsets.f0) || 
restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
                     restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
                 }
diff --git 
a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java 
b/src/main/java/org/apache/rocketmq/flink/example/example/RocketMQFlinkExample.java
similarity index 81%
rename from 
src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
rename to 
src/main/java/org/apache/rocketmq/flink/example/example/RocketMQFlinkExample.java
index f4f654e..92b8dbf 100644
--- a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/example/example/RocketMQFlinkExample.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.example;
+package org.apache.rocketmq.flink.example.example;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -36,7 +35,6 @@ public class RocketMQFlinkExample {
     public static void main(String[] args) {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // enable checkpoint
         env.enableCheckpointing(3000);
 
         Properties consumerProps = new Properties();
@@ -47,7 +45,7 @@ public class RocketMQFlinkExample {
         Properties producerProps = new Properties();
         producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, 
"localhost:9876");
         int msgDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL05;
-        
producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL,String.valueOf(msgDelayLevel));
+        producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL, 
String.valueOf(msgDelayLevel));
         // TimeDelayLevel is not supported for batching
         boolean batchFlag = msgDelayLevel <= 0;
 
@@ -60,7 +58,7 @@ public class RocketMQFlinkExample {
                     HashMap result = new HashMap();
                     result.put("id", in.get("id"));
                     String[] arr = in.get("address").toString().split("\\s+");
-                    result.put("province", arr[arr.length-1]);
+                    result.put("province", arr[arr.length - 1]);
                     out.collect(result);
                 }
             })
@@ -73,7 +71,8 @@ public class RocketMQFlinkExample {
 
         try {
             env.execute("rocketmq-flink-example");
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             e.printStackTrace();
         }
     }
diff --git a/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java 
b/src/main/java/org/apache/rocketmq/flink/example/example/SimpleConsumer.java
similarity index 75%
rename from src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java
rename to 
src/main/java/org/apache/rocketmq/flink/example/example/SimpleConsumer.java
index 1b07b8d..c087513 100644
--- a/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/example/example/SimpleConsumer.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.example;
+package org.apache.rocketmq.flink.example.example;
 
 import java.util.List;
 
@@ -27,7 +26,7 @@ import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 
-public class ConsumerTest {
+public class SimpleConsumer {
     public static void main(String[] args) {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g00003");
         consumer.setNamesrvAddr("localhost:9876");
diff --git a/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java 
b/src/main/java/org/apache/rocketmq/flink/example/example/SimpleProducer.java
similarity index 52%
rename from src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java
rename to 
src/main/java/org/apache/rocketmq/flink/example/example/SimpleProducer.java
index c04ca74..5a6b572 100644
--- a/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/example/example/SimpleProducer.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,15 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.example;
+package org.apache.rocketmq.flink.example.example;
 
-import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.exception.RemotingException;
 
-public class ProducerTest {
+public class SimpleProducer {
     public static void main(String[] args) {
         DefaultMQProducer producer = new DefaultMQProducer("p001");
         producer.setNamesrvAddr("localhost:9876");
@@ -34,16 +31,10 @@ public class ProducerTest {
             e.printStackTrace();
         }
         for (int i = 0; i < 10000; i++) {
-            Message msg = new Message("flink-source2" , "", "id_"+i, 
("country_X province_" + i).getBytes());
+            Message msg = new Message("flink-source2", "", "id_" + i, 
("country_X province_" + i).getBytes());
             try {
                 producer.send(msg);
-            } catch (MQClientException e) {
-                e.printStackTrace();
-            } catch (RemotingException e) {
-                e.printStackTrace();
-            } catch (MQBrokerException e) {
-                e.printStackTrace();
-            } catch (InterruptedException e) {
+            } catch (Exception e) {
                 e.printStackTrace();
             }
             System.out.println("send " + i);
diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml
index e3155cc..d5d591d 100644
--- a/style/rmq_checkstyle.xml
+++ b/style/rmq_checkstyle.xml
@@ -32,10 +32,12 @@
         <property name="header" value="/\*\nLicensed to the Apache Software 
Foundation*"/>
     </module>
 
+    <!--
     <module name="RegexpSingleline">
         <property name="format" value="System\.out\.println"/>
         <property name="message" value="Prohibit invoking System.out.println 
in source code !"/>
     </module>
+    -->
 
     <module name="RegexpSingleline">
         <property name="format" value="//FIXME"/>

Reply via email to