Repository: incubator-kylin Updated Branches: refs/heads/devstreaming [created] bdada03f7
KYLIN-1023 kylin streaming log start end offset for each partition for data verification Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/54cab0ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/54cab0ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/54cab0ab Branch: refs/heads/devstreaming Commit: 54cab0ab4a5aed6e5c963c2465876723ba91bb9d Parents: fe9e02c Author: honma <ho...@ebay.com> Authored: Wed Sep 16 10:53:05 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Wed Sep 16 10:53:05 2015 +0800 ---------------------------------------------------------------------- .../kylin/engine/streaming/StreamingCLI.java | 2 - .../org/apache/kylin/job/tools/KafkaVerify.java | 101 +++++++++++++++++++ .../kylin/streaming/MicroStreamBatch.java | 4 + .../kylin/streaming/OneOffStreamBuilder.java | 2 +- .../apache/kylin/streaming/StreamFetcher.java | 19 ++-- .../apache/kylin/streaming/StreamingConfig.java | 2 +- 6 files changed, 119 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java index 277ee69..8bf52c1 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java @@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -/** - */ public class StreamingCLI { private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java b/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java new file mode 100644 index 0000000..ee64e66 --- /dev/null +++ b/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.tools; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; +import com.fasterxml.jackson.databind.type.SimpleType; + +/** + * only for verify kylin streaming's correctness by comparing to data in original kafka topic + */ +public class KafkaVerify { + + public static void main(String[] args) throws IOException { + + System.out.println("start"); + + ObjectMapper mapper = new ObjectMapper(); + JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class)); + + long start = Long.valueOf(args[0]); + long end = Long.valueOf(args[1]); + long interval = Long.valueOf(args[2]); + int bucket = (int) ((end - start + interval - 1) / interval); + + long qtySum[] = new long[bucket]; + long qtyTotal = 0; + long counts[] = new long[bucket]; + long countTotal = 0; + long processed = 0; + long minOffset = -1; + long maxOffset = -1; + + try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) { + String s; + while ((s = br.readLine()) != null) { + // process the line. + if (++processed % 10000 == 1) { + System.out.println("processing " + processed); + } + + Map<String, String> root = mapper.readValue(s, mapType); + String tsStr = root.get("sys_ts"); + + if (StringUtils.isEmpty(tsStr)) { + continue; + } + long ts = Long.valueOf(tsStr); + if (ts < start || ts >= end) { + continue; + } + + if (minOffset == -1) { + minOffset = processed - 1; + } + maxOffset = processed - 1; + + long qty = Long.valueOf(root.get("qty")); + int index = (int) ((ts - start) / interval); + qtySum[index] += qty; + qtyTotal += qty; + counts[index]++; + countTotal++; + } + } + + System.out.println("qty sum is " + Arrays.toString(qtySum)); + System.out.println("qty total is " + qtyTotal); + System.out.println("count is " + Arrays.toString(counts)); + System.out.println("count total is " + countTotal); + System.out.println("first processed is " + minOffset); + System.out.println("last processed is " + maxOffset); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java index f4d9e05..27f817e 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java @@ -62,6 +62,10 @@ public final class MicroStreamBatch { return this.rawMessageCount; } + public final int getFilteredMessageCount() { + return this.streams.size(); + } + public final void add(ParsedStreamMessage parsedStreamMessage) { if (offset.getFirst() > parsedStreamMessage.getOffset()) { offset.setFirst(parsedStreamMessage.getOffset()); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java index 927873a..ae0f70f 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java @@ -45,7 +45,7 @@ public class OneOffStreamBuilder implements Runnable { final List<Future<MicroStreamBatch>> futures = Lists.newLinkedList(); int partitionId = 0; for (BlockingQueue<StreamMessage> queue : queues) { - futures.add(executorService.submit(new StreamFetcher(partitionId, queue, countDownLatch, batchCondition, streamParser))); + futures.add(executorService.submit(new StreamFetcher(partitionId++, queue, countDownLatch, batchCondition, streamParser))); } countDownLatch.await(); List<MicroStreamBatch> batches = Lists.newLinkedList(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java index 85d09be..f429a49 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java @@ -7,6 +7,8 @@ import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** */ public class StreamFetcher implements Callable<MicroStreamBatch> { @@ -26,9 +28,6 @@ public class StreamFetcher implements Callable<MicroStreamBatch> { this.streamParser = streamParser; } - private void clearCounter() { - } - private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) { long t = System.currentTimeMillis(); while (true) { @@ -57,7 +56,6 @@ public class StreamFetcher implements Callable<MicroStreamBatch> { while (true) { if (microStreamBatch == null) { microStreamBatch = new MicroStreamBatch(partitionId); - clearCounter(); } StreamMessage streamMessage = peek(streamMessageQueue, 60000); if (streamMessage == null) { @@ -83,21 +81,28 @@ public class StreamFetcher implements Callable<MicroStreamBatch> { } else if (result == BatchCondition.Result.LAST_ACCEPT_FOR_BATCH) { streamMessageQueue.take(); microStreamBatch.add(parsedStreamMessage); - return microStreamBatch; + break; } else if (result == BatchCondition.Result.DISCARD) { streamMessageQueue.take(); } else if (result == BatchCondition.Result.REJECT) { - return microStreamBatch; + logger.info("Partition :" + partitionId + " rejecting message at " + parsedStreamMessage.getOffset()); + break; } } else { streamMessageQueue.take(); } } + + Preconditions.checkArgument(microStreamBatch != null, "microStreamBatch is null!"); + logger.info(String.format("Partition %d contributing %d filtered messages out from %d raw messages"// + , partitionId, microStreamBatch.getFilteredMessageCount(), microStreamBatch.getRawMessageCount())); + return microStreamBatch; + } catch (Exception e) { logger.error("build stream error, stop building", e); throw new RuntimeException("build stream error, stop building", e); } finally { - logger.info("one partition sign off"); + logger.info("partition {} sign off", partitionId); countDownLatch.countDown(); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54cab0ab/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java index 320768b..c2d5361 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java @@ -84,7 +84,7 @@ public class StreamingConfig extends RootPersistentEntity { @JsonProperty("parserName") private String parserName; - //"configA=1;configB=2" + //"tsColName=timestamp;x=y" @JsonProperty("parserProperties") private String parserProperties;