APACHE-KYLIN-2722: Introduce a new measure, called active reservoir, for 
actively pushing metrics to reporters


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e3d8ff96
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e3d8ff96
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e3d8ff96

Branch: refs/heads/yaho-cube-planner
Commit: e3d8ff96f6ab9d806c4e58d93a052bc9f64c6e14
Parents: 47b5a0d
Author: Zhong <nju_y...@apache.org>
Authored: Tue Aug 8 22:50:54 2017 +0800
Committer: Zhong <nju_y...@apache.org>
Committed: Tue Aug 8 22:50:54 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   8 +
 core-metrics/pom.xml                            |  52 ++++
 .../kylin/metrics/lib/ActiveReservoir.java      |  40 +++
 .../metrics/lib/ActiveReservoirFilter.java      |  44 +++
 .../metrics/lib/ActiveReservoirListener.java    |  30 ++
 .../metrics/lib/ActiveReservoirReporter.java    |  51 ++++
 .../org/apache/kylin/metrics/lib/Record.java    |  51 ++++
 .../java/org/apache/kylin/metrics/lib/Sink.java |  23 ++
 .../lib/impl/AbstractActiveReservoir.java       |  68 +++++
 .../metrics/lib/impl/BaseScheduledReporter.java | 101 +++++++
 .../metrics/lib/impl/BlockingReservoir.java     | 163 +++++++++++
 .../metrics/lib/impl/InstantReservoir.java      |  74 +++++
 .../kylin/metrics/lib/impl/MetricsSystem.java   | 161 +++++++++++
 .../kylin/metrics/lib/impl/RecordEvent.java     | 272 +++++++++++++++++++
 .../metrics/lib/impl/RecordEventTimeDetail.java |  77 ++++++
 .../metrics/lib/impl/RecordEventWrapper.java    |  61 +++++
 .../kylin/metrics/lib/impl/ReporterBuilder.java |  48 ++++
 .../kylin/metrics/lib/impl/StubReservoir.java   |  54 ++++
 .../metrics/lib/impl/StubReservoirReporter.java |  51 ++++
 .../apache/kylin/metrics/lib/impl/StubSink.java |  30 ++
 .../metrics/lib/impl/TimePropertyEnum.java      |  47 ++++
 metrics-reporter-hive/pom.xml                   |  53 ++++
 .../metrics/lib/impl/hive/HiveProducer.java     | 200 ++++++++++++++
 .../lib/impl/hive/HiveProducerRecord.java       | 196 +++++++++++++
 .../lib/impl/hive/HiveReservoirReporter.java    | 139 ++++++++++
 .../kylin/metrics/lib/impl/hive/HiveSink.java   |  30 ++
 metrics-reporter-kafka/pom.xml                  |  47 ++++
 .../kafka/KafkaActiveReserviorListener.java     | 115 ++++++++
 .../lib/impl/kafka/KafkaReservoirReporter.java  | 139 ++++++++++
 .../kylin/metrics/lib/impl/kafka/KafkaSink.java |  29 ++
 pom.xml                                         |   6 +
 31 files changed, 2460 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f06e0fa..41a967f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1120,4 +1120,12 @@ abstract public class KylinConfigBase implements 
Serializable {
     public boolean isWebCrossDomainEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.web.cross-domain-enabled", "true"));
     }
+
+    // 
============================================================================
+    // Metrics
+    // 
============================================================================
+    public String getMetricsActiveReservoirDefaultClass() {
+        return getOptional("kylin.core.metrics.active-reservoir-default-class",
+                "org.apache.kylin.metrics.lib.impl.StubReservoir");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/core-metrics/pom.xml b/core-metrics/pom.xml
new file mode 100644
index 0000000..44d9512
--- /dev/null
+++ b/core-metrics/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-core-metrics</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Core Metrics</name>
+    <description>Apache Kylin - Core Metrics</description>
+
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>2.1.0</version>
+    </parent>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${dropwizard.metrics.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
new file mode 100644
index 0000000..36ab759
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.io.Closeable;
+
+public interface ActiveReservoir extends Closeable {
+
+    int size();
+
+    void update(Record record);
+
+    void addListener(ActiveReservoirListener listener);
+
+    void removeListener(ActiveReservoirListener listener);
+
+    void removeAllListener();
+
+    void setHAListener(ActiveReservoirListener listener);
+
+    void start();
+
+    void stop();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
new file mode 100644
index 0000000..5cffcfc
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+/**
+ * A filter used to determine whether or not an active reservoir should be 
reported, among other things.
+ */
+public interface ActiveReservoirFilter {
+
+    /**
+     * Matches all active reservoirs, regardless of type or name.
+     */
+    ActiveReservoirFilter ALL = new ActiveReservoirFilter() {
+        @Override
+        public boolean matches(String name, ActiveReservoir activeReservoir) {
+            return true;
+        }
+    };
+
+    /**
+     * Returns {@code true} if the active reservoir matches the filter; {@code 
false} otherwise.
+     *
+     * @param name      the active reservoir's name
+     * @param activeReservoir    the active reservoir
+     * @return {@code true} if the active reservoir matches the filter
+     */
+    boolean matches(String name, ActiveReservoir activeReservoir);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
new file mode 100644
index 0000000..f64caba
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.io.Closeable;
+import java.util.EventListener;
+import java.util.List;
+
+public interface ActiveReservoirListener extends EventListener, Closeable {
+
+    boolean onRecordUpdate(final List<Record> records);
+
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
new file mode 100644
index 0000000..6020865
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.io.Closeable;
+import java.util.regex.Pattern;
+
+import org.apache.kylin.common.util.Pair;
+
+import com.google.common.base.Strings;
+
+public abstract class ActiveReservoirReporter implements Closeable {
+
+    public static final String KYLIN_PREFIX = "KYLIN";
+
+    public static Pair<String, String> getTableNameSplits(String tableName) {
+        if (Strings.isNullOrEmpty(tableName)) {
+            return null;
+        }
+
+        String[] splits = tableName.split(Pattern.quote("."));
+        int i = 0;
+        String database = splits.length == 1 ? KYLIN_PREFIX : splits[i++];
+        String tableNameOnly = splits[i];
+        return new Pair(database, tableNameOnly);
+    }
+
+    public static String getTableName(Pair<String, String> tableNameSplits) {
+        return tableNameSplits.getFirst() + "." + tableNameSplits.getSecond();
+    }
+
+    public abstract void start();
+
+    public abstract void stop();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
new file mode 100644
index 0000000..a1bce1f
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.util.Map;
+
+public interface Record {
+
+    /**
+     *  For classification
+     */
+    String getType();
+
+    /**
+     *  For keep ordering in the same category
+     */
+    byte[] getKey();
+
+    /**
+     *  For the contents will be used
+     */
+    byte[] getValue();
+
+    /**
+     *  For the raw contents will be used
+     */
+    Map<String, Object> getValueRaw();
+
+    /**
+     *  For the timestamp the record created
+     */
+    Long getTime();
+
+    Record clone();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
new file mode 100644
index 0000000..dff71bd
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+public interface Sink {
+    String getTableFromSubject(String subject);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
new file mode 100644
index 0000000..cc72710
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+
+import com.google.common.collect.Lists;
+
+public abstract class AbstractActiveReservoir implements ActiveReservoir {
+
+    protected List<ActiveReservoirListener> listeners = Lists.newArrayList();
+
+    protected ActiveReservoirListener listenerHA = new 
StubReservoirReporter().listener;
+
+    protected boolean isReady = false;
+
+    public void addListener(ActiveReservoirListener listener) {
+        listeners.add(listener);
+    }
+
+    public void removeListener(ActiveReservoirListener listener) {
+        listener.close();
+        listeners.remove(listener);
+    }
+
+    public void removeAllListener() {
+        for (ActiveReservoirListener listener : listeners) {
+            listener.close();
+        }
+        listeners.clear();
+    }
+
+    public void setHAListener(ActiveReservoirListener listener) {
+        this.listenerHA = listener;
+    }
+
+    public void start() {
+        isReady = true;
+    }
+
+    public void stop() {
+        isReady = false;
+    }
+
+    public void close() {
+        stop();
+        removeAllListener();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
new file mode 100644
index 0000000..3597600
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.Closeable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public abstract class BaseScheduledReporter implements Closeable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BaseScheduledReporter.class);
+
+    private final ScheduledExecutorService executor;
+
+    BaseScheduledReporter() {
+        this("default");
+    }
+
+    BaseScheduledReporter(String name) {
+        this(Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setNameFormat("metrics-scheduler-" + name + 
"-%d").build()));
+    }
+
+    BaseScheduledReporter(ScheduledExecutorService executor) {
+        this.executor = executor;
+    }
+
+    public abstract void report();
+
+    /**
+     * Starts the reporter polling at the given period.
+     *
+     * @param period the amount of time between polls
+     * @param unit   the unit for {@code period}
+     */
+    public void start(long period, TimeUnit unit) {
+        executor.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    report();
+                } catch (RuntimeException ex) {
+                    logger.error("RuntimeException thrown from {}#report. 
Exception was suppressed.", 
BaseScheduledReporter.this.getClass().getSimpleName(), ex);
+                }
+            }
+        }, period, period, unit);
+    }
+
+    /**
+     * Stops the reporter and shuts down its thread of execution.
+     *
+     * Uses the shutdown pattern from 
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
+     */
+    public void stop() {
+        executor.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+                executor.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+                    System.err.println(getClass().getSimpleName() + ": 
ScheduledExecutorService did not terminate");
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            executor.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Stops the reporter and shuts down its thread of execution.
+     */
+    @Override
+    public void close() {
+        stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
new file mode 100644
index 0000000..37e2ce6
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class BlockingReservoir extends AbstractActiveReservoir {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BlockingReservoir.class);
+
+    private final BlockingQueue<Record> recordsQueue;
+    private final Thread scheduledReporter;
+    private final int MIN_REPORT_SIZE;
+    private final int MAX_REPORT_SIZE;
+    private final long MAX_REPORT_TIME;
+    private List<Record> records;
+
+    public BlockingReservoir() {
+        this(1, 100);
+    }
+
+    public BlockingReservoir(int minReportSize, int maxReportSize) {
+        this(minReportSize, maxReportSize, 10);
+    }
+
+    public BlockingReservoir(int minReportSize, int maxReportSize, int 
MAX_REPORT_TIME) {
+        this.MAX_REPORT_SIZE = maxReportSize;
+        this.MIN_REPORT_SIZE = minReportSize;
+        this.MAX_REPORT_TIME = MAX_REPORT_TIME * 60 * 1000L;
+
+        this.recordsQueue = new LinkedBlockingQueue<>();
+        this.listeners = Lists.newArrayList();
+
+        this.records = Lists.newArrayListWithExpectedSize(MAX_REPORT_SIZE);
+
+        scheduledReporter = new 
ThreadFactoryBuilder().setNameFormat("metrics-blocking-reservoir-scheduler-%d").build().newThread(new
 ReporterRunnable());
+    }
+
+    public void update(Record record) {
+        if (!isReady) {
+            logger.info("Current reservoir is not ready for update record");
+            return;
+        }
+        try {
+            recordsQueue.put(record);
+        } catch (InterruptedException e) {
+            logger.warn("Thread is interrupted during putting value to 
blocking queue. \n" + e.toString());
+        }
+    }
+
+    public int size() {
+        return recordsQueue.size();
+    }
+
+    private void onRecordUpdate(boolean ifAll) {
+        if (ifAll) {
+            records = Lists.newArrayList();
+            recordsQueue.drainTo(records);
+        } else {
+            records.clear();
+            recordsQueue.drainTo(records, MAX_REPORT_SIZE);
+        }
+
+        boolean ifSucceed = true;
+        for (ActiveReservoirListener listener : listeners) {
+            if (!notifyListenerOfUpdatedRecord(listener, records)) {
+                ifSucceed = false;
+                logger.warn("It fails to notify listener " + 
listener.toString() + " of updated records " + records.toString());
+            }
+        }
+        if (!ifSucceed) {
+            notifyListenerHAOfUpdatedRecord(records);
+        }
+    }
+
+    private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener 
listener, List<Record> records) {
+        return listener.onRecordUpdate(records);
+    }
+
+    private boolean notifyListenerHAOfUpdatedRecord(List<Record> records) {
+        logger.info("The HA listener " + listenerHA.toString() + " for updated 
records " + records.toString() + " will be started");
+        if (!notifyListenerOfUpdatedRecord(listenerHA, records)) {
+            logger.error("The HA listener also fails!!!");
+            return false;
+        }
+        return true;
+    }
+
+    public void start() {
+        super.start();
+        scheduledReporter.start();
+    }
+
+    public void stop() {
+        super.stop();
+        scheduledReporter.interrupt();
+        try {
+            scheduledReporter.join();
+        } catch (InterruptedException e) {
+            logger.warn("Interrupted during join");
+            throw new RuntimeException(e);
+        }
+    }
+
+    class ReporterRunnable implements Runnable {
+
+        public void run() {
+            long startTime = System.currentTimeMillis();
+            while (isReady) {
+                if (size() <= 0) {
+                    logger.info("There's no record in the blocking queue.");
+                    sleep();
+                    startTime = System.currentTimeMillis();
+                    continue;
+                } else if (size() < MIN_REPORT_SIZE && 
(System.currentTimeMillis() - startTime < MAX_REPORT_TIME)) {
+                    logger.info("The number of records in the blocking queue 
is less than " + MIN_REPORT_SIZE + //
+                            " and the duration from last reporting is less 
than " + MAX_REPORT_TIME + "ms. Will delay to report!");
+                    sleep();
+                    continue;
+                }
+
+                onRecordUpdate(false);
+                startTime = System.currentTimeMillis();
+            }
+            onRecordUpdate(true);
+            logger.info("Reporter finishes reporting metrics.");
+        }
+
+        private void sleep() {
+            try {
+                Thread.sleep(60 * 1000);
+            } catch (InterruptedException e) {
+                logger.warn("Interrupted during running");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
new file mode 100644
index 0000000..a4719a5
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class InstantReservoir extends AbstractActiveReservoir {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(InstantReservoir.class);
+
+    public void update(Record record) {
+        if (!isReady) {
+            logger.info("Current reservoir is not ready for update record");
+            return;
+        }
+        onRecordUpdate(record);
+    }
+
+    public int size() {
+        return 0;
+    }
+
+    private void onRecordUpdate(Record record) {
+        boolean ifSucceed = true;
+        for (ActiveReservoirListener listener : listeners) {
+            if (!notifyListenerOfUpdatedRecord(listener, record)) {
+                ifSucceed = false;
+                logger.warn("It fails to notify listener " + 
listener.toString() + " of updated record " + record.getKey());
+            }
+        }
+        if (!ifSucceed) {
+            notifyListenerHAOfUpdatedRecord(record);
+        }
+    }
+
+    private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener 
listener, Record record) {
+        List<Record> recordsList = Lists.newArrayList();
+        recordsList.add(record);
+        return listener.onRecordUpdate(recordsList);
+    }
+
+    private boolean notifyListenerHAOfUpdatedRecord(Record record) {
+        logger.info("The HA listener " + listenerHA.toString() + " for updated 
record " + record.getKey() + " will be started");
+        if (!notifyListenerOfUpdatedRecord(listenerHA, record)) {
+            logger.error("The HA listener also fails!!!");
+            return false;
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
new file mode 100644
index 0000000..d87a648
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class MetricsSystem extends MetricRegistry {
+    public static final MetricsSystem Metrics = new MetricsSystem();
+    private static final Logger logger = 
LoggerFactory.getLogger(MetricsSystem.class);
+    private final ConcurrentHashMap<String, ActiveReservoir> activeReservoirs;
+
+    private MetricsSystem() {
+        activeReservoirs = new ConcurrentHashMap<>();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                logger.info("Closing Metrics System");
+                try {
+                    shutdown();
+                } catch (IOException e) {
+                    logger.error("error during shutdown activeReservoirs and 
listeners", e);
+                }
+                logger.info("Closed Metrics System");
+            }
+        });
+    }
+
+    public void shutdown() throws IOException {
+        for (ActiveReservoir entry : activeReservoirs.values()) {
+            entry.close();
+        }
+    }
+
+    public ActiveReservoir activeReservoir(String name) {
+        return getOrAddActiveReservoir(name);
+    }
+
+    public ActiveReservoir register(String name, ActiveReservoir 
activeReservoir) {
+        if (name == null || activeReservoir == null) {
+            throw new IllegalArgumentException("neither of name or 
ActiveReservoir can be null");
+        }
+        final ActiveReservoir existingReservoir = 
activeReservoirs.putIfAbsent(name, activeReservoir);
+        if (existingReservoir == null) {
+            onActiveReservoirAdded(activeReservoir);
+        } else {
+            throw new IllegalArgumentException("An active reservoir named " + 
name + " already exists");
+        }
+
+        return activeReservoir;
+    }
+
+    /**
+     * Removes the active reservoir with the given name.
+     *
+     * @param name the name of the active reservoir
+     * @return whether or not the active reservoir was removed
+     */
+    public boolean removeActiveReservoir(String name) {
+        final ActiveReservoir recordReservoir = activeReservoirs.remove(name);
+        if (recordReservoir != null) {
+            onActiveReservoirRemoved(recordReservoir);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Removes all active reservoirs which match the given filter.
+     *
+     * @param filter a filter
+     */
+    public void removeActiveReservoirMatching(ActiveReservoirFilter filter) {
+        for (Map.Entry<String, ActiveReservoir> entry : 
activeReservoirs.entrySet()) {
+            if (filter.matches(entry.getKey(), entry.getValue())) {
+                removeActiveReservoir(entry.getKey());
+            }
+        }
+    }
+
+    private void onActiveReservoirAdded(ActiveReservoir activeReservoir) {
+        activeReservoir.start();
+    }
+
+    private void onActiveReservoirRemoved(ActiveReservoir activeReservoir) {
+        try {
+            activeReservoir.close();
+        } catch (IOException e) {
+        }
+    }
+
+    /**
+     * Returns a map of all the active reservoirs in the metrics system and 
their names.
+     *
+     * @return all the active reservoirs in the metrics system
+     */
+    public SortedMap<String, ActiveReservoir> getActiveReservoirs() {
+        return getActiveReservoirs(ActiveReservoirFilter.ALL);
+    }
+
+    /**
+     * Returns a map of all the active reservoirs in the metrics system and 
their names which match the given filter.
+     *
+     * @param filter    the active reservoir filter to match
+     * @return all the active reservoirs in the metrics system
+     */
+    public SortedMap<String, ActiveReservoir> 
getActiveReservoirs(ActiveReservoirFilter filter) {
+        final TreeMap<String, ActiveReservoir> reservoirs = new TreeMap<>();
+        for (Map.Entry<String, ActiveReservoir> entry : 
activeReservoirs.entrySet()) {
+            if (filter.matches(entry.getKey(), entry.getValue())) {
+                reservoirs.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return Collections.unmodifiableSortedMap(reservoirs);
+    }
+
+    private ActiveReservoir getOrAddActiveReservoir(String name) {
+        ActiveReservoir activeReservoir = activeReservoirs.get(name);
+        if (activeReservoir != null) {
+            return activeReservoir;
+        } else {
+            String defaultActiveReservoirClass = 
KylinConfig.getInstanceFromEnv().getMetricsActiveReservoirDefaultClass();
+            try {
+                activeReservoir = (ActiveReservoir) 
Class.forName(defaultActiveReservoirClass).getConstructor().newInstance();
+            } catch (Exception e) {
+                logger.warn("Failed to initialize the " + 
defaultActiveReservoirClass + ". The StubReservoir will be used");
+                activeReservoir = new StubReservoir();
+            }
+            return register(name, activeReservoir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
new file mode 100644
index 0000000..f5bc797
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metrics.lib.Record;
+
+import com.google.common.collect.Maps;
+
+public class RecordEvent implements Record, Map<String, Object>, Serializable {
+
+    private static final ThreadLocal<ByteArrayOutputStream> _localBaos = new 
ThreadLocal<ByteArrayOutputStream>();
+
+    static String localHostname;
+    static {
+        try {
+            InetAddress addr = InetAddress.getLocalHost();
+            localHostname = addr.getHostName() + ":" + addr.getHostAddress();
+        } catch (UnknownHostException e) {
+            localHostname = "Unknown";
+        }
+    }
+
+    private final Map<String, Object> backingMap;
+
+    private RecordEvent(Map<String, Object> map) {
+        this.backingMap = map;
+    }
+
+    public RecordEvent(String eventType) {
+        this(eventType, localHostname);
+    }
+
+    public RecordEvent(String eventType, long time) {
+        this(eventType, localHostname, time);
+    }
+
+    public RecordEvent(String eventType, String host) {
+        this(eventType, host, System.currentTimeMillis());
+    }
+
+    public RecordEvent(String eventType, String host, long time) {
+        this(null, eventType, host, time);
+    }
+
+    /**
+     *
+     * @param map
+     * @param eventType     mandatory   with null check
+     * @param host          mandatory   without null check
+     * @param time          mandatory   with null check
+     */
+    public RecordEvent(Map<String, Object> map, String eventType, String host, 
long time) {
+        backingMap = map != null ? map : Maps.<String, Object> newHashMap();
+        setEventType(eventType);
+        setHost(host);
+        setTime(time);
+    }
+
+    public String getEventType() {
+        return (String) get(RecordReserveKeyEnum.TYPE.toString());
+    }
+
+    private void setEventType(String eventType) {
+        if (eventType == null) {
+            throw new IllegalArgumentException("EventType cannot be null.");
+        }
+        put(RecordReserveKeyEnum.TYPE.toString(), eventType);
+    }
+
+    public String getHost() {
+        return (String) get(RecordReserveKeyEnum.HOST.toString());
+    }
+
+    private void setHost(String host) {
+        put(RecordReserveKeyEnum.HOST.toString(), host);
+    }
+
+    public Long getTime() {
+        return (Long) get(RecordReserveKeyEnum.TIME.toString());
+    }
+
+    private void setTime(Long time) {
+        if (time == null) {
+            throw new IllegalArgumentException("Time cannot be null.");
+        }
+        put(RecordReserveKeyEnum.TIME.toString(), time);
+    }
+
+    public void resetTime() {
+        setTime(System.currentTimeMillis());
+    }
+
+    public String getID() {
+        return (String) get(RecordReserveKeyEnum.ID.toString());
+    }
+
+    public void setID(String id) {
+        put(RecordReserveKeyEnum.ID.toString(), id);
+    }
+
+    @Override
+    public void clear() {
+        backingMap.clear();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return backingMap.containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return backingMap.containsValue(value);
+    }
+
+    @Override
+    public Set<Entry<String, Object>> entrySet() {
+        return backingMap.entrySet();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return super.equals(o) || backingMap.equals(o);
+    }
+
+    @Override
+    public Object get(Object key) {
+        return backingMap.get(key);
+    }
+
+    @Override
+    public int hashCode() {
+        return backingMap.hashCode();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return backingMap.isEmpty();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return backingMap.keySet();
+    }
+
+    @Override
+    public Object put(String key, Object value) {
+        return backingMap.put(key, value);
+    }
+
+    @Override
+    public void putAll(Map<? extends String, ? extends Object> t) {
+        backingMap.putAll(t);
+    }
+
+    @Override
+    public Object remove(Object key) {
+        return backingMap.remove(key);
+    }
+
+    @Override
+    public int size() {
+        return backingMap.size();
+    }
+
+    @Override
+    public String toString() {
+        return backingMap.toString();
+    }
+
+    @Override
+    public Collection<Object> values() {
+        return backingMap.values();
+    }
+
+    @Override
+    public String getType() {
+        return getEventType();
+    }
+
+    @Override
+    public byte[] getKey() {
+        return (getHost() + "-" + getTime() + "-" + getID()).getBytes();
+    }
+
+    @Override
+    /**
+     * Event type and time does not belong to value part
+     */
+    public Map<String, Object> getValueRaw() {
+        Map<String, Object> cloneMap = Maps.newHashMap(backingMap);
+        cloneMap.remove(RecordReserveKeyEnum.TYPE.toString());
+        return cloneMap;
+    }
+
+    @Override
+    /**
+     * Event type does not belong to value part, it's for classification
+     */
+    public byte[] getValue() {
+        try {
+            ByteArrayOutputStream baos = _localBaos.get();
+            if (baos == null) {
+                baos = new ByteArrayOutputStream();
+                _localBaos.set(baos);
+            }
+            baos.reset();
+            JsonUtil.writeValue(baos, getValueRaw());
+            return baos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);//in mem, should not happen
+        }
+    }
+
+    @Override
+    public RecordEvent clone() {
+        Map<String, Object> cloneMap = Maps.newHashMap();
+        cloneMap.putAll(backingMap);
+        return new RecordEvent(cloneMap);
+    }
+
+    public enum RecordReserveKeyEnum {
+        TYPE("EVENT_TYPE"), ID("EVENT_ID"), HOST("HOST"), TIME("KTIMESTAMP");
+
+        private final String reserveKey;
+
+        private RecordReserveKeyEnum(String key) {
+            this.reserveKey = key;
+        }
+
+        @Override
+        public String toString() {
+            return reserveKey;
+        }
+
+        public RecordReserveKeyEnum getByKey(String key) {
+            for (RecordReserveKeyEnum reserveKey : 
RecordReserveKeyEnum.values()) {
+                if (reserveKey.reserveKey == key) {
+                    return reserveKey;
+                }
+            }
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
new file mode 100644
index 0000000..ff97b9b
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.apache.kylin.common.KylinConfig;
+
+public class RecordEventTimeDetail {
+    private static final TimeZone timeZone;
+    private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = 
new ThreadLocal<SimpleDateFormat>();
+    private static final ThreadLocal<SimpleDateFormat> timeFormatThreadLocal = 
new ThreadLocal<SimpleDateFormat>();
+
+    static {
+        timeZone = 
TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone());
+    }
+
+    public final String year_begin_date;
+    public final String month_begin_date;
+    public final String date;
+    public final String time;
+    public final int hour;
+    public final int minute;
+    public final int second;
+    public final String week_begin_date;
+
+    public RecordEventTimeDetail(long timeStamp) {
+        Calendar calendar = Calendar.getInstance(timeZone);
+        calendar.setTimeInMillis(timeStamp);
+
+        SimpleDateFormat dateFormat = dateFormatThreadLocal.get();
+        if (dateFormat == null) {
+            dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+            dateFormat.setTimeZone(timeZone);
+            dateFormatThreadLocal.set(dateFormat);
+        }
+        SimpleDateFormat timeFormat = timeFormatThreadLocal.get();
+        if (timeFormat == null) {
+            timeFormat = new SimpleDateFormat("HH:mm:ss");
+            timeFormat.setTimeZone(timeZone);
+            timeFormatThreadLocal.set(timeFormat);
+        }
+
+        String yearStr = String.format("%04d", calendar.get(Calendar.YEAR));
+        String monthStr = String.format("%02d", calendar.get(Calendar.MONTH) + 
1);
+        this.year_begin_date = yearStr + "-01-01";
+        this.month_begin_date = yearStr + "-" + monthStr + "-01";
+        this.date = dateFormat.format(calendar.getTime());
+        this.time = timeFormat.format(calendar.getTime());
+        this.hour = calendar.get(Calendar.HOUR_OF_DAY);
+        this.minute = calendar.get(Calendar.MINUTE);
+        this.second = calendar.get(Calendar.SECOND);
+
+        long timeStampForWeekBegin = timeStamp;
+        timeStampForWeekBegin -= 3600000 * 24 * 
(calendar.get(Calendar.DAY_OF_WEEK) - 1);
+        calendar.setTimeInMillis(timeStampForWeekBegin);
+        this.week_begin_date = dateFormat.format(calendar.getTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
new file mode 100644
index 0000000..7031129
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.Serializable;
+
+import org.apache.kylin.metrics.lib.Record;
+
+public class RecordEventWrapper implements Serializable {
+
+    protected final RecordEvent metricsEvent;
+
+    public RecordEventWrapper(RecordEvent metricsEvent) {
+        this.metricsEvent = metricsEvent;
+
+        //Add time details
+        addTimeDetails();
+    }
+
+    private void addTimeDetails() {
+        RecordEventTimeDetail dateDetail = new 
RecordEventTimeDetail(metricsEvent.getTime());
+        metricsEvent.put(TimePropertyEnum.YEAR.toString(), 
dateDetail.year_begin_date);
+        metricsEvent.put(TimePropertyEnum.MONTH.toString(), 
dateDetail.month_begin_date);
+        metricsEvent.put(TimePropertyEnum.WEEK_BEGIN_DATE.toString(), 
dateDetail.week_begin_date);
+        metricsEvent.put(TimePropertyEnum.DAY_DATE.toString(), 
dateDetail.date);
+        metricsEvent.put(TimePropertyEnum.DAY_TIME.toString(), 
dateDetail.time);
+        metricsEvent.put(TimePropertyEnum.TIME_HOUR.toString(), 
dateDetail.hour);
+        metricsEvent.put(TimePropertyEnum.TIME_MINUTE.toString(), 
dateDetail.minute);
+        metricsEvent.put(TimePropertyEnum.TIME_SECOND.toString(), 
dateDetail.second);
+    }
+
+    public void resetTime() {
+        metricsEvent.resetTime();
+        addTimeDetails();
+    }
+
+    public Record getMetricsRecord() {
+        return metricsEvent;
+    }
+
+    @Override
+    public String toString() {
+        return metricsEvent.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
new file mode 100644
index 0000000..22fadd3
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.Properties;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+
+public abstract class ReporterBuilder {
+    protected final ActiveReservoir registry;
+    protected final Properties props;
+
+    protected ReporterBuilder(ActiveReservoir activeReservoir) {
+        this.registry = activeReservoir;
+        this.props = new Properties();
+    }
+
+    public ReporterBuilder setConfig(Properties props) {
+        if (props != null) {
+            this.props.putAll(props);
+        }
+        return this;
+    }
+
+    /**
+     * Builds a {@link ActiveReservoirReporter} with the given properties.
+     *
+     * @return a {@link ActiveReservoirReporter}
+     */
+    public abstract ActiveReservoirReporter build() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
new file mode 100644
index 0000000..fe69dec
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+
+public class StubReservoir implements ActiveReservoir {
+
+    public void addListener(ActiveReservoirListener listener) {
+    }
+
+    public void removeListener(ActiveReservoirListener listener) {
+    }
+
+    public void removeAllListener() {
+    }
+
+    public void setHAListener(ActiveReservoirListener listener) {
+    }
+
+    public void update(Record record) {
+    }
+
+    public int size() {
+        return 0;
+    }
+
+    public void start() {
+    }
+
+    public void stop() {
+    }
+
+    public void close() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
new file mode 100644
index 0000000..5e0e637
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+
+public class StubReservoirReporter extends ActiveReservoirReporter {
+
+    public static final String STUB_REPORTER_SUFFIX = "STUB";
+
+    public final StubReservoirListener listener = new StubReservoirListener();
+
+    public void start() {
+    }
+
+    public void stop() {
+    }
+
+    public void close() {
+    }
+
+    private class StubReservoirListener implements ActiveReservoirListener {
+
+        public boolean onRecordUpdate(final List<Record> records) {
+            return true;
+        }
+
+        public void close() {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java
new file mode 100644
index 0000000..676c59c
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import static 
org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX;
+import static 
org.apache.kylin.metrics.lib.impl.StubReservoirReporter.STUB_REPORTER_SUFFIX;
+
+import org.apache.kylin.metrics.lib.Sink;
+
+public class StubSink implements Sink {
+    public String getTableFromSubject(String subject) {
+        return KYLIN_PREFIX + "." + STUB_REPORTER_SUFFIX + "_" + subject;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
new file mode 100644
index 0000000..eb9a395
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import com.google.common.base.Strings;
+
+public enum TimePropertyEnum {
+    YEAR("KYEAR_BEGIN_DATE"), MONTH("KMONTH_BEGIN_DATE"), 
WEEK_BEGIN_DATE("KWEEK_BEGIN_DATE"), DAY_DATE("KDAY_DATE"), 
DAY_TIME("KDAY_TIME"), TIME_HOUR("KTIME_HOUR"), TIME_MINUTE("KTIME_MINUTE"), 
TIME_SECOND("KTIME_SECOND");
+
+    private final String propertyName;
+
+    TimePropertyEnum(String propertyName) {
+        this.propertyName = propertyName;
+    }
+
+    public static TimePropertyEnum getByPropertyName(String propertyName) {
+        if (Strings.isNullOrEmpty(propertyName)) {
+            return null;
+        }
+        for (TimePropertyEnum property : TimePropertyEnum.values()) {
+            if (property.propertyName.equals(propertyName.toUpperCase())) {
+                return property;
+            }
+        }
+        return null;
+    }
+
+    public String toString() {
+        return propertyName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/metrics-reporter-hive/pom.xml
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml
new file mode 100644
index 0000000..4c49acb
--- /dev/null
+++ b/metrics-reporter-hive/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-metrics-reporter-hive</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Metrics Reporter Hive</name>
+    <description>Apache Kylin - Metrics Reporter Hive</description>
+
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>2.1.0</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
----------------------------------------------------------------------
diff --git 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
new file mode 100644
index 0000000..2ab0090
--- /dev/null
+++ 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
+import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class HiveProducer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HiveProducer.class);
+
+    private static final int CACHE_MAX_SIZE = 10;
+
+    private final HiveConf hiveConf;
+    private final FileSystem hdfs;
+    private final LoadingCache<Pair<String, String>, Pair<String, 
List<FieldSchema>>> tableFieldSchemaCache;
+    private final String CONTENT_FILE_NAME;
+
+    public HiveProducer(Properties props) throws Exception {
+        hiveConf = new HiveConf();
+        hdfs = FileSystem.get(hiveConf);
+
+        for (Map.Entry<Object, Object> e : props.entrySet()) {
+            hiveConf.set(e.getKey().toString(), e.getValue().toString());
+        }
+
+        tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener(new 
RemovalListener<Pair<String, String>, Pair<String, List<FieldSchema>>>() {
+            @Override
+            public void onRemoval(RemovalNotification<Pair<String, String>, 
Pair<String, List<FieldSchema>>> notification) {
+                logger.info("Field schema with table " + 
ActiveReservoirReporter.getTableName(notification.getKey()) + " is removed due 
to " + notification.getCause());
+            }
+        }).maximumSize(CACHE_MAX_SIZE).build(new CacheLoader<Pair<String, 
String>, Pair<String, List<FieldSchema>>>() {
+            @Override
+            public Pair<String, List<FieldSchema>> load(Pair<String, String> 
tableName) throws Exception {
+                HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(hiveConf);
+                String tableLocation = 
metaStoreClient.getTable(tableName.getFirst(), 
tableName.getSecond()).getSd().getLocation();
+                List<FieldSchema> fields = 
metaStoreClient.getFields(tableName.getFirst(), tableName.getSecond());
+                metaStoreClient.close();
+                return new Pair(tableLocation, fields);
+            }
+        });
+
+        String hostName;
+        try {
+            hostName = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            hostName = "UNKNOWN";
+        }
+        CONTENT_FILE_NAME = hostName + "-part-0000";
+    }
+
+    public void close() {
+        tableFieldSchemaCache.cleanUp();
+    }
+
+    public void send(final Record record) throws Exception {
+        HiveProducerRecord hiveRecord = convertTo(record);
+        write(hiveRecord.key(), Lists.newArrayList(hiveRecord));
+    }
+
+    public void send(final List<Record> recordList) throws Exception {
+        Map<RecordKey, List<HiveProducerRecord>> recordMap = Maps.newHashMap();
+        for (Record record : recordList) {
+            HiveProducerRecord hiveRecord = convertTo(record);
+            if (recordMap.get(hiveRecord.key()) == null) {
+                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> 
newLinkedList());
+            }
+            recordMap.get(hiveRecord.key()).add(hiveRecord);
+        }
+
+        for (Map.Entry<RecordKey, List<HiveProducerRecord>> entry : 
recordMap.entrySet()) {
+            write(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void write(RecordKey recordKey, Iterable<HiveProducerRecord> 
recordItr) throws Exception {
+        String tableLocation = tableFieldSchemaCache.get(new 
Pair(recordKey.database(), recordKey.table())).getFirst();
+        StringBuilder sb = new StringBuilder();
+        sb.append(tableLocation);
+        for (Map.Entry<String, String> e : recordKey.partition().entrySet()) {
+            sb.append("/");
+            sb.append(e.getKey().toLowerCase());
+            sb.append("=");
+            sb.append(e.getValue());
+        }
+        Path partitionPath = new Path(sb.toString());
+        if (!hdfs.exists(partitionPath)) {
+            StringBuilder hql = new StringBuilder();
+            hql.append("ALTER TABLE ");
+            hql.append(recordKey.database() + "." + recordKey.table());
+            hql.append(" ADD IF NOT EXISTS PARTITION (");
+            boolean ifFirst = true;
+            for (Map.Entry<String, String> e : 
recordKey.partition().entrySet()) {
+                if (ifFirst) {
+                    ifFirst = false;
+                } else {
+                    hql.append(",");
+                }
+                hql.append(e.getKey().toLowerCase());
+                hql.append("='" + e.getValue() + "'");
+            }
+            hql.append(")");
+            Driver driver = new Driver(hiveConf);
+            SessionState.start(new CliSessionState(hiveConf));
+            driver.run(hql.toString());
+            driver.close();
+        }
+        Path partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME);
+        if (!hdfs.exists(partitionContentPath)) {
+            int nRetry = 0;
+            while (!hdfs.createNewFile(partitionContentPath) && nRetry++ < 5) {
+                if (hdfs.exists(partitionContentPath)) {
+                    break;
+                }
+                Thread.sleep(500L * nRetry);
+            }
+            if (!hdfs.exists(partitionContentPath)) {
+                throw new RuntimeException("Fail to create HDFS file: " + 
partitionContentPath + " after " + nRetry + " retries");
+            }
+        }
+        FSDataOutputStream fout = hdfs.append(partitionContentPath);
+        try {
+            for (HiveProducerRecord elem : recordItr) {
+                fout.writeBytes(elem.valueToString() + "\n");
+            }
+        } catch (IOException e) {
+            logger.error("Fails to write metrics to file " + 
partitionContentPath.toString());
+        } finally {
+            IOUtils.closeQuietly(fout);
+        }
+    }
+
+    private HiveProducerRecord convertTo(Record record) throws Exception {
+        Map<String, Object> rawValue = record.getValueRaw();
+
+        //Set partition values for hive table
+        Map<String, String> partitionKVs = Maps.newHashMapWithExpectedSize(1);
+        partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), 
rawValue.get(TimePropertyEnum.DAY_DATE.toString()).toString());
+
+        return 
parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getType()),
 partitionKVs, rawValue);
+    }
+
+    public HiveProducerRecord parseToHiveProducerRecord(String tableName, 
Map<String, String> partitionKVs, Map<String, Object> rawValue) throws 
Exception {
+        Pair<String, String> tableNameSplits = 
ActiveReservoirReporter.getTableNameSplits(tableName);
+        List<FieldSchema> fields = 
tableFieldSchemaCache.get(tableNameSplits).getSecond();
+        List<Object> columnValues = 
Lists.newArrayListWithExpectedSize(fields.size());
+        for (FieldSchema fieldSchema : fields) {
+            
columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase()));
+        }
+
+        return new HiveProducerRecord(tableNameSplits.getFirst(), 
tableNameSplits.getSecond(), partitionKVs, columnValues);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e3d8ff96/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
----------------------------------------------------------------------
diff --git 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
new file mode 100644
index 0000000..8bf93ec
--- /dev/null
+++ 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Strings;
+
+public class HiveProducerRecord {
+
+    public static final String DELIMITER = ",";
+
+    private final RecordKey key;
+    private final List<Object> value;
+
+    public HiveProducerRecord(String dbName, String tableName, Map<String, 
String> partitionKVs, List<Object> value) {
+        this.key = new RecordKey(dbName, tableName, partitionKVs);
+        this.value = value;
+    }
+
+    public HiveProducerRecord(String tableName, Map<String, String> 
partitionKVs, List<Object> value) {
+        this.key = new RecordKey(tableName, partitionKVs);
+        this.value = value;
+    }
+
+    public HiveProducerRecord(String dbName, String tableName, List<Object> 
value) {
+        this.key = new RecordKey(dbName, tableName);
+        this.value = value;
+    }
+
+    public HiveProducerRecord(String tableName, List<Object> value) {
+        this.key = new RecordKey(tableName);
+        this.value = value;
+    }
+
+    public RecordKey key() {
+        return this.key;
+    }
+
+    public List<Object> value() {
+        return this.value;
+    }
+
+    public String toString() {
+        String value = this.value == null ? "null" : this.value.toString();
+        return "HiveProducerRecord(key=" + this.key.toString() + ", value=" + 
value + ")";
+    }
+
+    public String valueToString() {
+        if (this.value == null || value.isEmpty()) {
+            return null;
+        }
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < value.size() - 1; i++) {
+            sb.append(value.get(i) + DELIMITER);
+        }
+        sb.append(value.get(value.size() - 1));
+        return sb.toString();
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        } else if (!(o instanceof HiveProducerRecord)) {
+            return false;
+        } else {
+            HiveProducerRecord that = (HiveProducerRecord) o;
+            if (this.key != null) {
+                if (!this.key.equals(that.key)) {
+                    return false;
+                }
+            } else if (that.key != null) {
+                return false;
+            }
+            if (this.value != null) {
+                if (!this.value.equals(that.value)) {
+                    return false;
+                }
+            } else if (that.value != null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public int hashCode() {
+        int result = this.key != null ? this.key.hashCode() : 0;
+        result = 31 * result + (this.value != null ? this.value.hashCode() : 
0);
+        return result;
+    }
+
+    public class RecordKey {
+        public static final String DEFAULT_DB_NAME = "DEFAULT";
+
+        private final String dbName;
+        private final String tableName;
+        private final Map<String, String> partitionKVs;
+
+        public RecordKey(String dbName, String tableName, Map<String, String> 
partitionKVs) {
+            if (Strings.isNullOrEmpty(dbName)) {
+                this.dbName = DEFAULT_DB_NAME;
+            } else {
+                this.dbName = dbName;
+            }
+            this.tableName = tableName;
+            this.partitionKVs = partitionKVs;
+        }
+
+        public RecordKey(String tableName, Map<String, String> partitionKVs) {
+            this(null, tableName, partitionKVs);
+        }
+
+        public RecordKey(String dbName, String tableName) {
+            this(dbName, tableName, null);
+        }
+
+        public RecordKey(String tableName) {
+            this(null, tableName, null);
+        }
+
+        public String database() {
+            return this.dbName;
+        }
+
+        public String table() {
+            return this.tableName;
+        }
+
+        public Map<String, String> partition() {
+            return this.partitionKVs;
+        }
+
+        public String toString() {
+            String partitionKVs = this.partitionKVs == null ? "null" : 
this.partitionKVs.toString();
+            return "RecordKey(database=" + this.dbName + ", table=" + 
this.tableName + ", partition=" + partitionKVs + ")";
+        }
+
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            } else if (!(o instanceof RecordKey)) {
+                return false;
+            } else {
+                RecordKey that = (RecordKey) o;
+                if (this.dbName != null) {
+                    if (!this.dbName.equals(that.dbName)) {
+                        return false;
+                    }
+                } else if (that.dbName != null) {
+                    return false;
+                }
+
+                if (this.tableName != null) {
+                    if (!this.tableName.equals(that.tableName)) {
+                        return false;
+                    }
+                } else if (that.tableName != null) {
+                    return false;
+                }
+
+                if (this.partitionKVs != null) {
+                    if (!this.partitionKVs.equals(that.partitionKVs)) {
+                        return false;
+                    }
+                } else if (that.partitionKVs != null) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        public int hashCode() {
+            int result = this.dbName != null ? this.dbName.hashCode() : 0;
+            result = 31 * result + (this.tableName != null ? 
this.tableName.hashCode() : 0);
+            result = 31 * result + (this.partitionKVs != null ? 
this.partitionKVs.hashCode() : 0);
+            return result;
+        }
+    }
+}

Reply via email to