This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5f6e93cc5c43e7d583156626094a8706eac75d1d Author: etherge <[email protected]> AuthorDate: Thu Jan 23 19:02:00 2020 -0500 minor, sonar issues, volatile variables should not be used with compound operators --- .../core/consumer/StreamingConsumerChannel.java | 33 +++++++++++----------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java index f5d70e3..b27d830 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.kylin.stream.core.exception.StreamingException; import org.apache.kylin.stream.core.metrics.StreamingMetrics; @@ -52,10 +53,10 @@ public class StreamingConsumerChannel implements Runnable { private StreamingSegmentManager cubeSegmentManager; private volatile IStopConsumptionCondition stopCondition; private volatile long minAcceptEventTime; - private volatile long parseEventErrorCnt; - private volatile long addEventErrorCnt; - private volatile long incomingEventCnt; - private volatile long dropEventCnt; + private AtomicLong parseEventErrorCnt = new AtomicLong(0); + private AtomicLong addEventErrorCnt = new AtomicLong(0); + private AtomicLong incomingEventCnt = new AtomicLong(0); + private AtomicLong dropEventCnt = new AtomicLong(0); private Map<Integer, Meter> eventConsumeMeters; public StreamingConsumerChannel(String cubeName, IStreamingConnector connector, @@ -96,7 +97,7 @@ public class StreamingConsumerChannel implements Runnable { Thread.sleep(100); continue; } - incomingEventCnt++; + incomingEventCnt.incrementAndGet(); recordConsumeMetric(event.getSourcePosition().getPartition(), event.getParams()); if (!stopCondition.isSatisfied(event)) { if (!isFilter(event)) { @@ -107,16 +108,16 @@ public class StreamingConsumerChannel implements Runnable { this.stopped = true; } } catch (MessageFormatException mfe) { - parseEventErrorCnt++; - if (parseEventErrorCnt % 1000 < 3) { + long countValue = parseEventErrorCnt.incrementAndGet(); + if (countValue % 1000 < 3) { logger.error(mfe.getMessage(), mfe); } } catch (InterruptedException ie) { logger.warn("interrupted!"); stopped = true; } catch (Exception e) { - addEventErrorCnt++; - if (addEventErrorCnt % 1000 < 3) { + long countValue = addEventErrorCnt.incrementAndGet(); + if (countValue % 1000 < 3) { logger.error("error happens when save event:" + event, e); } } @@ -146,8 +147,7 @@ public class StreamingConsumerChannel implements Runnable { private void removeMetrics() { for (Map.Entry<Integer, Meter> meterEntry : eventConsumeMeters.entrySet()) { StreamingMetrics.getInstance().getMetricRegistry().remove(MetricRegistry - .name(StreamingMetrics.CONSUME_RATE_PFX, - cubeName, String.valueOf(meterEntry.getKey()))); + .name(StreamingMetrics.CONSUME_RATE_PFX, cubeName, String.valueOf(meterEntry.getKey()))); } } @@ -163,9 +163,9 @@ public class StreamingConsumerChannel implements Runnable { private boolean isFilter(StreamingMessage event) { if (minAcceptEventTime != 0 && event.getTimestamp() < minAcceptEventTime) { - dropEventCnt++; + long countValue = dropEventCnt.incrementAndGet(); // drop events is less than the min accepted event time - if (dropEventCnt % 1000 <= 1) { + if (countValue % 1000 <= 1) { logger.warn("event dropped, event time {}, min event accept time {}", event.getTimestamp(), minAcceptEventTime); } @@ -269,7 +269,8 @@ public class StreamingConsumerChannel implements Runnable { } public ConsumerStats getConsumerStats() { - Map<Integer, Long> consumeLagMap = connector.getSource().calConsumeLag(cubeName, cubeSegmentManager.getConsumePosition()); + Map<Integer, Long> consumeLagMap = connector.getSource().calConsumeLag(cubeName, + cubeSegmentManager.getConsumePosition()); long totalLag = 0L; Map<Integer, PartitionConsumeStats> partitionConsumeStatsMap = Maps.newHashMap(); for (Map.Entry<Integer, Meter> meterEntry : eventConsumeMeters.entrySet()) { @@ -295,8 +296,8 @@ public class StreamingConsumerChannel implements Runnable { ConsumerStats stats = new ConsumerStats(); stats.setStopped(stopped); stats.setPaused(paused); - stats.setTotalIncomingEvents(incomingEventCnt); - stats.setTotalExceptionEvents(parseEventErrorCnt + addEventErrorCnt); + stats.setTotalIncomingEvents(incomingEventCnt.get()); + stats.setTotalExceptionEvents(parseEventErrorCnt.get() + addEventErrorCnt.get()); stats.setPartitionConsumeStatsMap(partitionConsumeStatsMap); stats.setConsumeOffsetInfo(getSourceConsumeInfo()); stats.setConsumeLag(totalLag);
