This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5f6e93cc5c43e7d583156626094a8706eac75d1d
Author: etherge <[email protected]>
AuthorDate: Thu Jan 23 19:02:00 2020 -0500

    minor, sonar issues, volatile variables should not be used with compound 
operators
---
 .../core/consumer/StreamingConsumerChannel.java    | 33 +++++++++++-----------
 1 file changed, 17 insertions(+), 16 deletions(-)

diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
index f5d70e3..b27d830 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.stream.core.exception.StreamingException;
 import org.apache.kylin.stream.core.metrics.StreamingMetrics;
@@ -52,10 +53,10 @@ public class StreamingConsumerChannel implements Runnable {
     private StreamingSegmentManager cubeSegmentManager;
     private volatile IStopConsumptionCondition stopCondition;
     private volatile long minAcceptEventTime;
-    private volatile long parseEventErrorCnt;
-    private volatile long addEventErrorCnt;
-    private volatile long incomingEventCnt;
-    private volatile long dropEventCnt;
+    private AtomicLong parseEventErrorCnt = new AtomicLong(0);
+    private AtomicLong addEventErrorCnt = new AtomicLong(0);
+    private AtomicLong incomingEventCnt = new AtomicLong(0);
+    private AtomicLong dropEventCnt = new AtomicLong(0);
     private Map<Integer, Meter> eventConsumeMeters;
 
     public StreamingConsumerChannel(String cubeName, IStreamingConnector 
connector,
@@ -96,7 +97,7 @@ public class StreamingConsumerChannel implements Runnable {
                         Thread.sleep(100);
                         continue;
                     }
-                    incomingEventCnt++;
+                    incomingEventCnt.incrementAndGet();
                     
recordConsumeMetric(event.getSourcePosition().getPartition(), 
event.getParams());
                     if (!stopCondition.isSatisfied(event)) {
                         if (!isFilter(event)) {
@@ -107,16 +108,16 @@ public class StreamingConsumerChannel implements Runnable 
{
                         this.stopped = true;
                     }
                 } catch (MessageFormatException mfe) {
-                    parseEventErrorCnt++;
-                    if (parseEventErrorCnt % 1000 < 3) {
+                    long countValue = parseEventErrorCnt.incrementAndGet();
+                    if (countValue % 1000 < 3) {
                         logger.error(mfe.getMessage(), mfe);
                     }
                 } catch (InterruptedException ie) {
                     logger.warn("interrupted!");
                     stopped = true;
                 } catch (Exception e) {
-                    addEventErrorCnt++;
-                    if (addEventErrorCnt % 1000 < 3) {
+                    long countValue = addEventErrorCnt.incrementAndGet();
+                    if (countValue % 1000 < 3) {
                         logger.error("error happens when save event:" + event, 
e);
                     }
                 }
@@ -146,8 +147,7 @@ public class StreamingConsumerChannel implements Runnable {
     private void removeMetrics() {
         for (Map.Entry<Integer, Meter> meterEntry : 
eventConsumeMeters.entrySet()) {
             
StreamingMetrics.getInstance().getMetricRegistry().remove(MetricRegistry
-                    .name(StreamingMetrics.CONSUME_RATE_PFX,
-                    cubeName, String.valueOf(meterEntry.getKey())));
+                    .name(StreamingMetrics.CONSUME_RATE_PFX, cubeName, 
String.valueOf(meterEntry.getKey())));
         }
     }
 
@@ -163,9 +163,9 @@ public class StreamingConsumerChannel implements Runnable {
 
     private boolean isFilter(StreamingMessage event) {
         if (minAcceptEventTime != 0 && event.getTimestamp() < 
minAcceptEventTime) {
-            dropEventCnt++;
+            long countValue = dropEventCnt.incrementAndGet();
             // drop events is less than the min accepted event time
-            if (dropEventCnt % 1000 <= 1) {
+            if (countValue % 1000 <= 1) {
                 logger.warn("event dropped, event time {}, min event accept 
time {}", event.getTimestamp(),
                         minAcceptEventTime);
             }
@@ -269,7 +269,8 @@ public class StreamingConsumerChannel implements Runnable {
     }
 
     public ConsumerStats getConsumerStats() {
-        Map<Integer, Long> consumeLagMap = 
connector.getSource().calConsumeLag(cubeName, 
cubeSegmentManager.getConsumePosition());
+        Map<Integer, Long> consumeLagMap = 
connector.getSource().calConsumeLag(cubeName,
+                cubeSegmentManager.getConsumePosition());
         long totalLag = 0L;
         Map<Integer, PartitionConsumeStats> partitionConsumeStatsMap = 
Maps.newHashMap();
         for (Map.Entry<Integer, Meter> meterEntry : 
eventConsumeMeters.entrySet()) {
@@ -295,8 +296,8 @@ public class StreamingConsumerChannel implements Runnable {
         ConsumerStats stats = new ConsumerStats();
         stats.setStopped(stopped);
         stats.setPaused(paused);
-        stats.setTotalIncomingEvents(incomingEventCnt);
-        stats.setTotalExceptionEvents(parseEventErrorCnt + addEventErrorCnt);
+        stats.setTotalIncomingEvents(incomingEventCnt.get());
+        stats.setTotalExceptionEvents(parseEventErrorCnt.get() + 
addEventErrorCnt.get());
         stats.setPartitionConsumeStatsMap(partitionConsumeStatsMap);
         stats.setConsumeOffsetInfo(getSourceConsumeInfo());
         stats.setConsumeLag(totalLag);

Reply via email to