APACHE-KYLIN-2723: Introduce metrics collector for query & job metrics

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8261b3a3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8261b3a3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8261b3a3

Branch: refs/heads/yaho-cube-planner
Commit: 8261b3a351e52cd935cdc551d2eaebd7f7c54e18
Parents: e3d8ff9
Author: Zhong <nju_y...@apache.org>
Authored: Tue Aug 8 23:57:22 2017 +0800
Committer: Zhong <nju_y...@apache.org>
Committed: Tue Aug 8 23:57:22 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   9 ++
 .../apache/kylin/metrics/MetricsManager.java    | 142 +++++++++++++++++++
 .../job/ExceptionRecordEventWrapper.java        |  40 ++++++
 .../kylin/metrics/job/JobPropertyEnum.java      |  56 ++++++++
 .../metrics/job/JobRecordEventWrapper.java      |  83 +++++++++++
 .../query/CubeSegmentRecordEventWrapper.java    | 123 ++++++++++++++++
 .../metrics/query/QueryRecordEventWrapper.java  | 103 ++++++++++++++
 .../metrics/query/RPCRecordEventWrapper.java    |  82 +++++++++++
 pom.xml                                         |  15 ++
 server-base/pom.xml                             |  12 ++
 server/src/main/resources/kylinMetrics.xml      |  84 +++++++++++
 server/src/main/webapp/WEB-INF/web.xml          |   1 +
 .../kylin/rest/service/ServiceTestBase.java     |   3 +-
 13 files changed, 752 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 41a967f..8d4ee20 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1124,8 +1124,17 @@ abstract public class KylinConfigBase implements 
Serializable {
     // 
============================================================================
     // Metrics
     // 
============================================================================
+    public boolean isMetricsMonitorEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.core.metrics.monitor-enabled", 
"false"));
+    }
+
     public String getMetricsActiveReservoirDefaultClass() {
         return getOptional("kylin.core.metrics.active-reservoir-default-class",
                 "org.apache.kylin.metrics.lib.impl.StubReservoir");
     }
+
+    public String getSystemCubeSinkDefaultClass() {
+        return getOptional("kylin.core.metrics.system-cube-sink-default-class",
+                "org.apache.kylin.metrics.lib.impl.hive.HiveSink");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
new file mode 100644
index 0000000..8899f07
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.Sink;
+import org.apache.kylin.metrics.lib.impl.MetricsSystem;
+import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
+import org.apache.kylin.metrics.lib.impl.StubSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class MetricsManager {
+
+    public static final String SYSTEM_PROJECT = "KYLIN_SYSTEM";
+    private static final Logger logger = 
LoggerFactory.getLogger(MetricsManager.class);
+    private static final MetricsManager instance = new MetricsManager();
+    private static final String METHOD_FOR_REGISTRY = "forRegistry";
+    private static Map<ActiveReservoir, List<Pair<Class<? extends 
ActiveReservoirReporter>, Properties>>> sourceReporterBindProps = Maps
+            .newHashMap();
+    private static Sink scSink;
+    private final Set<String> activeReservoirPointers;
+
+    private MetricsManager() {
+        activeReservoirPointers = Sets.newHashSet();
+    }
+
+    public static MetricsManager getInstance() {
+        return instance;
+    }
+
+    public static void setSystemCubeSink(Sink systemCubeSink) {
+        scSink = systemCubeSink;
+    }
+
+    public static void setSourceReporterBindProps(
+            Map<ActiveReservoir, List<Pair<String, Properties>>> 
sourceReporterBindProperties) {
+        sourceReporterBindProps = 
Maps.newHashMapWithExpectedSize(sourceReporterBindProperties.size());
+        for (ActiveReservoir activeReservoir : 
sourceReporterBindProperties.keySet()) {
+            List<Pair<Class<? extends ActiveReservoirReporter>, Properties>> 
values = Lists
+                    
.newArrayListWithExpectedSize(sourceReporterBindProperties.get(activeReservoir).size());
+            sourceReporterBindProps.put(activeReservoir, values);
+            for (Pair<String, Properties> entry : 
sourceReporterBindProperties.get(activeReservoir)) {
+                try {
+                    Class clz = Class.forName(entry.getKey());
+                    if (ActiveReservoirReporter.class.isAssignableFrom(clz)) {
+                        values.add(new Pair(clz, entry.getValue()));
+                    } else {
+                        logger.warn("The class " + clz + " is not a sub class 
of " + ActiveReservoir.class);
+                    }
+                } catch (ClassNotFoundException e) {
+                    logger.warn("Cannot find class " + entry.getKey());
+                }
+            }
+        }
+    }
+
+    public void init() {
+        if (scSink == null) {
+            logger.warn("SystemCubeSink is not set and the default one will be 
chosen");
+            try {
+                Class clz = 
Class.forName(KylinConfig.getInstanceFromEnv().getSystemCubeSinkDefaultClass());
+                scSink = (Sink) clz.getConstructor().newInstance();
+            } catch (Exception e) {
+                logger.warn(
+                        "Failed to initialize the " + 
KylinConfig.getInstanceFromEnv().getSystemCubeSinkDefaultClass()
+                                + ". The StubSink will be used");
+                scSink = new StubSink();
+            }
+        }
+
+        if (KylinConfig.getInstanceFromEnv().isMetricsMonitorEnabled()) {
+            logger.info("Kylin metrics monitor is enabled.");
+            int nameIdx = 0;
+            for (ActiveReservoir activeReservoir : 
sourceReporterBindProps.keySet()) {
+                String registerName = MetricsSystem.name(MetricsManager.class,
+                        "-" + nameIdx + "-" + activeReservoir.toString());
+                activeReservoirPointers.add(registerName);
+                List<Pair<Class<? extends ActiveReservoirReporter>, 
Properties>> reportProps = sourceReporterBindProps
+                        .get(activeReservoir);
+                for (Pair<Class<? extends ActiveReservoirReporter>, 
Properties> subEntry : reportProps) {
+                    try {
+                        Method method = 
subEntry.getKey().getMethod(METHOD_FOR_REGISTRY, ActiveReservoir.class);
+                        ((ReporterBuilder) method.invoke(null, 
activeReservoir)).setConfig(subEntry.getValue()).build()
+                                .start();
+                    } catch (Exception e) {
+                        logger.warn("Cannot initialize 
ActiveReservoirReporter: Builder class - " + subEntry.getKey()
+                                + ", Properties - " + subEntry.getValue());
+                    }
+                }
+                Metrics.register(registerName, activeReservoir);
+            }
+            Preconditions.checkArgument(activeReservoirPointers.size() == 
sourceReporterBindProps.keySet().size(),
+                    "Duplicate register names exist!!!");
+        } else {
+            logger.info("Kylin metrics monitor is not enabled!!!");
+        }
+    }
+
+    public void update(Record record) {
+        for (String registerName : activeReservoirPointers) {
+            Metrics.activeReservoir(registerName).update(record);
+        }
+    }
+
+    public String getSystemTableFromSubject(String subject) {
+        return scSink.getTableFromSubject(subject);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
new file mode 100644
index 0000000..8d56025
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/job/ExceptionRecordEventWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.job;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+public class ExceptionRecordEventWrapper extends RecordEventWrapper {
+
+    public ExceptionRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+    }
+
+    public <T extends Throwable> void setWrapper(String projectName, String 
cubeName, String jobId, String jobType,
+            String cubingType, Class<T> exceptionClassName) {
+        this.metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
+        this.metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId);
+        this.metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType);
+        this.metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), 
cubingType);
+        this.metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), 
exceptionClassName.getName());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java
new file mode 100644
index 0000000..be32424
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobPropertyEnum.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.job;
+
+import com.google.common.base.Strings;
+
+public enum JobPropertyEnum {
+    ID_CODE("JOB_ID"), PROJECT("PROJECT"), CUBE("CUBE_NAME"), 
TYPE("JOB_TYPE"), ALGORITHM("CUBING_TYPE"), STATUS(
+            "JOB_STATUS"), EXCEPTION("EXCEPTION"), //
+    SOURCE_SIZE("TABLE_SIZE"), CUBE_SIZE("CUBE_SIZE"), 
BUILD_DURATION("DURATION"), WAIT_RESOURCE_TIME(
+            "WAIT_RESOURCE_TIME"), PER_BYTES_TIME_COST("PER_BYTES_TIME_COST"), 
STEP_DURATION_DISTINCT_COLUMNS(
+                    "STEP_DURATION_DISTINCT_COLUMNS"), 
STEP_DURATION_DICTIONARY(
+                            "STEP_DURATION_DICTIONARY"), 
STEP_DURATION_INMEM_CUBING(
+                                    "STEP_DURATION_INMEM_CUBING"), 
STEP_DURATION_HFILE_CONVERT(
+                                            "STEP_DURATION_HFILE_CONVERT");
+
+    private final String propertyName;
+
+    JobPropertyEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static JobPropertyEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            return null;
+        }
+        for (JobPropertyEnum property : JobPropertyEnum.values()) {
+            if (property.propertyName.equals(name.toUpperCase())) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
new file mode 100644
index 0000000..537388c
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/job/JobRecordEventWrapper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.job;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+public class JobRecordEventWrapper extends RecordEventWrapper {
+
+    public static final long MIN_SOURCE_SIZE = 33554432L; //32MB per block 
created by the first step
+
+    public JobRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+        initStats();
+    }
+
+    public void initStats() {
+        this.metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), 0L);
+        this.metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), 
0L);
+        
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(),
 0L);
+        
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 0L);
+        
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 
0L);
+        
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 
0L);
+        setDependentStats();
+    }
+
+    public void setWrapper(String projectName, String cubeName, String jobId, 
String jobType, String cubingType) {
+        this.metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName);
+        this.metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId);
+        this.metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType);
+        this.metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), 
cubingType);
+    }
+
+    public void setStats(long tableSize, long cubeSize, long buildDuration, 
long waitResourceTime) {
+        this.metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), 
tableSize);
+        this.metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize);
+        this.metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), 
buildDuration);
+        this.metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), 
waitResourceTime);
+        setDependentStats();
+    }
+
+    public void setStepStats(long dColumnDistinct, long dDictBuilding, long 
dCubingInmem, long dHfileConvert) {
+        
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(),
 dColumnDistinct);
+        
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), 
dDictBuilding);
+        
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), 
dCubingInmem);
+        
this.metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), 
dHfileConvert);
+    }
+
+    private void setDependentStats() {
+        Long sourceSize = (Long) 
this.metricsEvent.get(JobPropertyEnum.SOURCE_SIZE.toString());
+        if (sourceSize != null && sourceSize != 0) {
+            if (sourceSize < MIN_SOURCE_SIZE) {
+                sourceSize = MIN_SOURCE_SIZE;
+            }
+            
this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(),
+                    ((Long) 
this.metricsEvent.get(JobPropertyEnum.BUILD_DURATION.toString())
+                            - (Long) 
this.metricsEvent.get(JobPropertyEnum.WAIT_RESOURCE_TIME.toString())) * 1.0
+                            / sourceSize);
+        } else {
+            
this.metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), 0.0);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
new file mode 100644
index 0000000..6316c8e
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/query/CubeSegmentRecordEventWrapper.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.query;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+import com.google.common.base.Strings;
+
+public class CubeSegmentRecordEventWrapper extends RecordEventWrapper {
+
+    public CubeSegmentRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+
+        initStats();
+    }
+
+    private void initStats() {
+        this.metricsEvent.put(PropertyEnum.CALL_COUNT.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.TIME_SUM.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.TIME_MAX.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.SCAN_SIZE.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.RETURN_SIZE.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_SIZE.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.AGGR_SIZE.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.IF_SUCCESS.toString(), true);
+    }
+
+    public void setWrapper(String projectName, String cubeName, String 
segmentName, long sourceCuboidId,
+            long targetCuboidId, long filterMask) {
+        this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(PropertyEnum.CUBE.toString(), cubeName);
+        this.metricsEvent.put(PropertyEnum.SEGMENT.toString(), segmentName);
+        this.metricsEvent.put(PropertyEnum.CUBOID_SOURCE.toString(), 
sourceCuboidId);
+        this.metricsEvent.put(PropertyEnum.CUBOID_TARGET.toString(), 
targetCuboidId);
+        this.metricsEvent.put(PropertyEnum.IF_MATCH.toString(), sourceCuboidId 
== targetCuboidId);
+        this.metricsEvent.put(PropertyEnum.FILTER_MASK.toString(), filterMask);
+    }
+
+    public void setWeightPerHit(double weightPerHit) {
+        this.metricsEvent.put(PropertyEnum.WEIGHT_PER_HIT.toString(), 
weightPerHit);
+    }
+
+    public void addRPCStats(long callTimeMs, long skipCount, long scanSize, 
long returnSize, long aggrSize,
+            boolean ifSuccess) {
+        Long curCallCount = (Long) 
this.metricsEvent.get(PropertyEnum.CALL_COUNT.toString());
+        Long curTimeSum = (Long) 
this.metricsEvent.get(PropertyEnum.TIME_SUM.toString());
+        Long curTimeMax = (Long) 
this.metricsEvent.get(PropertyEnum.TIME_MAX.toString());
+        Long curSkipCount = (Long) 
this.metricsEvent.get(PropertyEnum.SKIP_COUNT.toString());
+        Long curScanSize = (Long) 
this.metricsEvent.get(PropertyEnum.SCAN_SIZE.toString());
+        Long curReturnSize = (Long) 
this.metricsEvent.get(PropertyEnum.RETURN_SIZE.toString());
+        Long curAggrAndFilterSize = (Long) 
this.metricsEvent.get(PropertyEnum.AGGR_FILTER_SIZE.toString());
+        Long curAggrSize = (Long) 
this.metricsEvent.get(PropertyEnum.AGGR_SIZE.toString());
+        Boolean curIfSuccess = (Boolean) 
this.metricsEvent.get(PropertyEnum.IF_SUCCESS.toString());
+
+        this.metricsEvent.put(PropertyEnum.CALL_COUNT.toString(), curCallCount 
+ 1);
+        this.metricsEvent.put(PropertyEnum.TIME_SUM.toString(), curTimeSum + 
callTimeMs);
+        if (curTimeMax < callTimeMs) {
+            this.metricsEvent.put(PropertyEnum.TIME_MAX.toString(), 
callTimeMs);
+        }
+        this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), curSkipCount 
+ skipCount);
+        this.metricsEvent.put(PropertyEnum.SCAN_SIZE.toString(), curScanSize + 
scanSize);
+        this.metricsEvent.put(PropertyEnum.RETURN_SIZE.toString(), 
curReturnSize + returnSize);
+        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_SIZE.toString(), 
curAggrAndFilterSize + scanSize - returnSize);
+        this.metricsEvent.put(PropertyEnum.AGGR_SIZE.toString(), curAggrSize + 
aggrSize);
+        this.metricsEvent.put(PropertyEnum.IF_SUCCESS.toString(), curIfSuccess 
&& ifSuccess);
+    }
+
+    public Object getProperty(String key) {
+        return this.metricsEvent.get(key);
+    }
+
+    public enum PropertyEnum {
+        PROJECT("PROJECT"), CUBE("CUBE_NAME"), SEGMENT("SEGMENT_NAME"), 
CUBOID_SOURCE("CUBOID_SOURCE"), CUBOID_TARGET(
+                "CUBOID_TARGET"), IF_MATCH("IF_MATCH"), 
FILTER_MASK("FILTER_MASK"), IF_SUCCESS("IF_SUCCESS"), //
+        TIME_SUM("STORAGE_CALL_TIME_SUM"), TIME_MAX("STORAGE_CALL_TIME_MAX"), 
WEIGHT_PER_HIT(
+                "WEIGHT_PER_HIT"), CALL_COUNT("STORAGE_CALL_COUNT"), 
SKIP_COUNT("STORAGE_COUNT_SKIP"), SCAN_SIZE(
+                        "STORAGE_SIZE_SCAN"), 
RETURN_SIZE("STORAGE_SIZE_RETURN"), AGGR_FILTER_SIZE(
+                                "STORAGE_SIZE_AGGREGATE_FILTER"), 
AGGR_SIZE("STORAGE_SIZE_AGGREGATE");
+
+        private final String propertyName;
+
+        PropertyEnum(String name) {
+            this.propertyName = name;
+        }
+
+        public static PropertyEnum getByName(String name) {
+            if (Strings.isNullOrEmpty(name)) {
+                return null;
+            }
+            for (PropertyEnum property : PropertyEnum.values()) {
+                if (property.propertyName.equals(name.toUpperCase())) {
+                    return property;
+                }
+            }
+
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return propertyName;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
new file mode 100644
index 0000000..f6cf051
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/query/QueryRecordEventWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.query;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+import com.google.common.base.Strings;
+
+public class QueryRecordEventWrapper extends RecordEventWrapper {
+
+    public QueryRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+        initStats();
+    }
+
+    private void initStats() {
+        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), "NULL");
+        this.metricsEvent.put(PropertyEnum.TIME_COST.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.CALCITE_RETURN_SIZE.toString(), 0L);
+        this.metricsEvent.put(PropertyEnum.STORAGE_RETURN_SIZE.toString(), 0L);
+        setDependentStats();
+    }
+
+    public void setWrapper(long queryHashCode, String queryType, String 
projectName, String realizationName,
+            int realizationType) {
+        this.metricsEvent.put(PropertyEnum.ID_CODE.toString(), queryHashCode);
+        this.metricsEvent.put(PropertyEnum.TYPE.toString(), queryType);
+        this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(PropertyEnum.REALIZATION.toString(), 
realizationName);
+        this.metricsEvent.put(PropertyEnum.REALIZATION_TYPE.toString(), 
realizationType);
+    }
+
+    public void addStorageStats(long addReturnSizeByStorage) {
+        Long curReturnSizeByStorage = (Long) 
this.metricsEvent.get(PropertyEnum.STORAGE_RETURN_SIZE.toString());
+        this.metricsEvent.put(PropertyEnum.STORAGE_RETURN_SIZE.toString(),
+                curReturnSizeByStorage + addReturnSizeByStorage);
+    }
+
+    public void setStats(long callTimeMs, long returnSizeByCalcite) {
+        this.metricsEvent.put(PropertyEnum.TIME_COST.toString(), callTimeMs);
+        this.metricsEvent.put(PropertyEnum.CALCITE_RETURN_SIZE.toString(), 
returnSizeByCalcite);
+        setDependentStats();
+    }
+
+    private void setDependentStats() {
+        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_SIZE.toString(),
+                Math.max(0L, (Long) 
this.metricsEvent.get(PropertyEnum.STORAGE_RETURN_SIZE.toString())
+                        - (Long) 
this.metricsEvent.get(PropertyEnum.CALCITE_RETURN_SIZE.toString())));
+    }
+
+    public <T extends Throwable> void setStats(Class<T> exceptionClassName) {
+        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), 
exceptionClassName.getName());
+    }
+
+    public enum PropertyEnum {
+        ID_CODE("QUERY_HASH_CODE"), TYPE("QUERY_TYPE"), PROJECT("PROJECT"), 
REALIZATION(
+                "REALIZATION"), REALIZATION_TYPE("REALIZATION_TYPE"), 
EXCEPTION("EXCEPTION"), //
+        TIME_COST("QUERY_TIME_COST"), 
CALCITE_RETURN_SIZE("CALCITE_SIZE_RETURN"), STORAGE_RETURN_SIZE(
+                "STORAGE_SIZE_RETURN"), 
AGGR_FILTER_SIZE("CALCITE_SIZE_AGGREGATE_FILTER");
+
+        private final String propertyName;
+
+        PropertyEnum(String name) {
+            this.propertyName = name;
+        }
+
+        public static PropertyEnum getByName(String name) {
+            if (Strings.isNullOrEmpty(name)) {
+                return null;
+            }
+            for (PropertyEnum property : PropertyEnum.values()) {
+                if (property.propertyName.equals(name.toUpperCase())) {
+                    return property;
+                }
+            }
+
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return propertyName;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
----------------------------------------------------------------------
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
new file mode 100644
index 0000000..b9ac087
--- /dev/null
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/query/RPCRecordEventWrapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.query;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.RecordEventWrapper;
+
+import com.google.common.base.Strings;
+
+public class RPCRecordEventWrapper extends RecordEventWrapper {
+
+    public RPCRecordEventWrapper(RecordEvent metricsEvent) {
+        super(metricsEvent);
+    }
+
+    public void setRPCCallWrapper(String projectName, String realizationName, 
String rpcServer) {
+        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), "NULL");
+        this.metricsEvent.put(PropertyEnum.PROJECT.toString(), projectName);
+        this.metricsEvent.put(PropertyEnum.REALIZATION.toString(), 
realizationName);
+        this.metricsEvent.put(PropertyEnum.RPC_SERVER.toString(), rpcServer);
+    }
+
+    public void setRPCCallStats(long callTimeMs, long skipCount, long 
scanSize, long returnSize, long aggrSize) {
+        this.metricsEvent.put(PropertyEnum.CALL_TIME.toString(), callTimeMs);
+        this.metricsEvent.put(PropertyEnum.SKIP_COUNT.toString(), skipCount); 
//Number of skips on region servers based on region meta or fuzzy filter
+        this.metricsEvent.put(PropertyEnum.SCAN_SIZE.toString(), scanSize); 
//Size scanned by region server
+        this.metricsEvent.put(PropertyEnum.RETURN_SIZE.toString(), 
returnSize);//Size returned by region server
+        this.metricsEvent.put(PropertyEnum.AGGR_FILTER_SIZE.toString(), 
scanSize - returnSize); //Size filtered and aggregated by coprocessor
+        this.metricsEvent.put(PropertyEnum.AGGR_SIZE.toString(), aggrSize); 
//Size aggregated by coprocessor
+    }
+
+    public <T extends Throwable> void setStats(Class<T> exceptionClassName) {
+        this.metricsEvent.put(PropertyEnum.EXCEPTION.toString(), 
exceptionClassName.getName());
+    }
+
+    public enum PropertyEnum {
+        PROJECT("PROJECT"), REALIZATION("REALIZATION"), 
RPC_SERVER("RPC_SERVER"), EXCEPTION("EXCEPTION"), //
+        CALL_TIME("CALL_TIME"), SKIP_COUNT("COUNT_SKIP"), 
SCAN_SIZE("SIZE_SCAN"), RETURN_SIZE(
+                "SIZE_RETURN"), AGGR_FILTER_SIZE("SIZE_AGGREGATE_FILTER"), 
AGGR_SIZE("SIZE_AGGREGATE");
+
+        private final String propertyName;
+
+        PropertyEnum(String name) {
+            this.propertyName = name;
+        }
+
+        public static PropertyEnum getByName(String name) {
+            if (Strings.isNullOrEmpty(name)) {
+                return null;
+            }
+            for (PropertyEnum property : PropertyEnum.values()) {
+                if (property.propertyName.equals(name.toUpperCase())) {
+                    return property;
+                }
+            }
+
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return propertyName;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 064d0c4..d734d4a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -234,6 +234,21 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-core-metrics</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-metrics-reporter-hive</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-metrics-reporter-kafka</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
                 <artifactId>kylin-engine-mr</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/server-base/pom.xml
----------------------------------------------------------------------
diff --git a/server-base/pom.xml b/server-base/pom.xml
index f5f4051..ff80407 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -46,10 +46,22 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
 
         <!-- these plug-in modules, should not have API dependencies -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-metrics-reporter-hive</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-metrics-reporter-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-storage-hbase</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/server/src/main/resources/kylinMetrics.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylinMetrics.xml 
b/server/src/main/resources/kylinMetrics.xml
new file mode 100644
index 0000000..92e391f
--- /dev/null
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -0,0 +1,84 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns="http://www.springframework.org/schema/beans";
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+            http://www.springframework.org/schema/beans/spring-beans-3.1.xsd";>
+
+    <description>Kylin Metrics Related Configuration</description>
+
+    <bean id="instantReservoir" 
class="org.apache.kylin.metrics.lib.impl.InstantReservoir"/>
+
+    <bean id="blockingReservoir" 
class="org.apache.kylin.metrics.lib.impl.BlockingReservoir">
+        <constructor-arg index="0">
+            <value>10</value>
+        </constructor-arg>
+        <constructor-arg index="1">
+            <value>100</value>
+        </constructor-arg>
+        <constructor-arg index="2">
+            <value>10</value>
+        </constructor-arg>
+    </bean>
+
+    <bean id="hiveSink" 
class="org.apache.kylin.metrics.lib.impl.hive.HiveSink"/>
+
+    <bean id="kafkaSink" 
class="org.apache.kylin.metrics.lib.impl.kafka.KafkaSink"/>
+
+    <bean id="systemCubeSink" 
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
+        <property name="targetClass" 
value="org.apache.kylin.metrics.MetricsManager"/>
+        <property name="targetMethod" value="setSystemCubeSink"/>
+        <property name="arguments">
+            <list>
+                <ref bean="hiveSink"/>
+            </list>
+        </property>
+    </bean>
+
+    <bean id="sourceReporterBindProperties" 
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
+        <property name="targetClass" 
value="org.apache.kylin.metrics.MetricsManager"/>
+        <property name="targetMethod" value="setSourceReporterBindProps"/>
+        <property name="arguments">
+            <map key-type="org.apache.kylin.metrics.lib.ActiveReservoir" 
value-type="java.util.List">
+                <entry key-ref="instantReservoir">
+                    <list>
+                        <bean class="org.apache.kylin.common.util.Pair">
+                            <property name="first"
+                                      
value="org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter"/>
+                            <property name="second">
+                                <props>
+                                    <prop 
key="bootstrap.servers">sandbox:9092</prop>
+                                </props>
+                            </property>
+                        </bean>
+                    </list>
+                </entry>
+                <entry key-ref="blockingReservoir">
+                    <list>
+                        <bean class="org.apache.kylin.common.util.Pair">
+                            <property name="first"
+                                      
value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/>
+                            <property name="second">
+                                <props>
+                                </props>
+                            </property>
+                        </bean>
+                    </list>
+                </entry>
+            </map>
+        </property>
+    </bean>
+
+</beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/server/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/server/src/main/webapp/WEB-INF/web.xml 
b/server/src/main/webapp/WEB-INF/web.xml
index b9cf620..cbce758 100644
--- a/server/src/main/webapp/WEB-INF/web.xml
+++ b/server/src/main/webapp/WEB-INF/web.xml
@@ -43,6 +43,7 @@
         <param-value>
                classpath:applicationContext.xml
                        classpath:kylinSecurity.xml
+            classpath:kylinMetrics.xml
                        classpath*:kylin-*-plugin.xml
                </param-value>
     </context-param>

http://git-wip-us.apache.org/repos/asf/kylin/blob/8261b3a3/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git 
a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java 
b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index e2f5258..cf8c2c1 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -45,7 +45,8 @@ import 
org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  * @author xduo
  */
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(locations = { "classpath:applicationContext.xml", 
"classpath:kylinSecurity.xml" })
+@ContextConfiguration(locations = { "classpath:applicationContext.xml", 
"classpath:kylinSecurity.xml",
+        "classpath:kylinMetrics.xml" })
 @ActiveProfiles("testing")
 public class ServiceTestBase extends LocalFileMetadataTestCase {
 

Reply via email to