KYLIN-2091 Add API to init the start-point (of each parition) for streaming cube

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c92f79ad
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c92f79ad
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c92f79ad

Branch: refs/heads/master-cdh5.7
Commit: c92f79ad39e562ed32fff1a55eb979f0593ed6e3
Parents: 5429006
Author: shaofengshi <shaofeng...@apache.org>
Authored: Mon Oct 17 17:40:57 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Mon Oct 17 17:53:56 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  2 +-
 .../java/org/apache/kylin/cube/CubeSegment.java | 12 ++++++
 .../org/apache/kylin/cube/model/CubeDesc.java   | 13 ++++++
 .../kylin/measure/topn/TopNMeasureType.java     |  2 +-
 server-base/pom.xml                             |  7 ++++
 .../kylin/rest/controller/CubeController.java   | 10 ++---
 .../rest/controller/CubeDescController.java     | 44 +++++++++++++++++++-
 .../kylin/source/kafka/job/SeekOffsetStep.java  |  8 +++-
 .../kylin/source/kafka/util/KafkaClient.java    | 22 +++++++++-
 9 files changed, 108 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4942081..73ac788 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -804,7 +804,7 @@ abstract public class KylinConfigBase implements 
Serializable {
     }
 
     public int getMaxBuildingSegments() {
-        return Integer.parseInt(getOptional("kylin.cube.building.segment.max", 
"2"));
+        return Integer.parseInt(getOptional("kylin.cube.building.segment.max", 
"10"));
     }
 
     public void setMaxBuildingSegments(int maxBuildingSegments) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index fdf1fb0..eb5b389 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -564,6 +564,12 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
 
     public void setSourcePartitionOffsetEnd(Map<Integer, Long> 
sourcePartitionOffsetEnd) {
         this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
+        long total = 0;
+        for (Long x : sourcePartitionOffsetEnd.values()) {
+            total += x;
+        }
+
+        this.sourceOffsetEnd = total;
     }
 
     public Map<Integer, Long> getSourcePartitionOffsetStart() {
@@ -572,5 +578,11 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
 
     public void setSourcePartitionOffsetStart(Map<Integer, Long> 
sourcePartitionOffsetStart) {
         this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
+        long total = 0;
+        for (Long x : sourcePartitionOffsetStart.values()) {
+            total += x;
+        }
+
+        this.sourceOffsetStart = total;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 4195451..bf3724a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -170,6 +170,10 @@ public class CubeDesc extends RootPersistentEntity 
implements IEngineAware {
 
     private Map<TblColRef, DeriveInfo> extendedColumnToHosts = 
Maps.newHashMap();
 
+    @JsonProperty("partition_offset_start")
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    private Map<Integer, Long> partitionOffsetStart = Maps.newHashMap();
+
     public boolean isEnableSharding() {
         //in the future may extend to other storage that is shard-able
         return storageType != IStorageAware.ID_HBASE && storageType != 
IStorageAware.ID_HYBRID;
@@ -1011,6 +1015,14 @@ public class CubeDesc extends RootPersistentEntity 
implements IEngineAware {
         this.partitionDateEnd = partitionDateEnd;
     }
 
+    public Map<Integer, Long> getPartitionOffsetStart() {
+        return partitionOffsetStart;
+    }
+
+    public void setPartitionOffsetStart(Map<Integer, Long> 
partitionOffsetStart) {
+        this.partitionOffsetStart = partitionOffsetStart;
+    }
+
     /** Get columns that have dictionary */
     public Set<TblColRef> getAllColumnsHaveDictionary() {
         Set<TblColRef> result = Sets.newLinkedHashSet();
@@ -1119,6 +1131,7 @@ public class CubeDesc extends RootPersistentEntity 
implements IEngineAware {
         newCubeDesc.setOverrideKylinProps(cubeDesc.getOverrideKylinProps());
         newCubeDesc.setConfig((KylinConfigExt) cubeDesc.getConfig());
         newCubeDesc.updateRandomUuid();
+        
newCubeDesc.setPartitionOffsetStart(cubeDesc.getPartitionOffsetStart());
         return newCubeDesc;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index fcf6d5e..2f93b77 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -262,7 +262,7 @@ public class TopNMeasureType extends 
MeasureType<TopNCounter<ByteArray>> {
             };
         }
 
-        if (digest.aggregations.size() == 0 ) {
+        if (digest.aggregations.size() == 0) {
             // directly query the UHC column without sorting
             unmatchedDimensions.removeAll(literalCol);
             return new CapabilityInfluence() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/server-base/pom.xml
----------------------------------------------------------------------
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 1302051..67013e4 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -122,6 +122,13 @@
             <artifactId>aspectjweaver</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>${kafka.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Test & Env -->
         <dependency>
             <groupId>org.apache.kylin</groupId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index be242c3..eefc452 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -536,22 +536,22 @@ public class CubeController extends BasicController {
     /**
      * get cube segment holes
      *
-     * @return true
+     * @return a list of CubeSegment, each representing a hole
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/hole", method = { RequestMethod.GET })
+    @RequestMapping(value = "/{cubeName}/holes", method = { RequestMethod.GET 
})
     @ResponseBody
     public List<CubeSegment> getHoles(@PathVariable String cubeName) {
         return cubeService.getCubeManager().calculateHoles(cubeName);
     }
 
     /**
-     * get cube segment holes
+     * fill cube segment holes
      *
-     * @return true
+     * @return a list of JobInstances to fill the holes
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/hole", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/holes", method = { RequestMethod.PUT 
})
     @ResponseBody
     public List<JobInstance> fillHoles(@PathVariable String cubeName) {
         List<JobInstance> jobs = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java
index 4e595cd..b7eaddd 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeDescController.java
@@ -19,10 +19,14 @@
 package org.apache.kylin.rest.controller;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.rest.response.GeneralResponse;
 import org.apache.kylin.rest.service.CubeService;
+import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -68,8 +72,8 @@ public class CubeDescController extends BasicController {
      * Get detail information of the "Cube ID"
      * return CubeDesc instead of CubeDesc[]
      *
-     * @param cubeDescName
-     *            Cube ID
+     * @param cubeName
+     *            Cube Name
      * @return
      * @throws IOException
      */
@@ -88,6 +92,42 @@ public class CubeDescController extends BasicController {
         }
     }
 
+    /**
+     * Initiate the very beginning of a streaming cube. Will seek the latest 
offests of each partition from streaming
+     * source (kafka) and record in the cube descriptor; In the first build 
job, it will use these offests as the start point.
+     * @param cubeName
+     * @return
+     */
+    @RequestMapping(value = "/{cubeName}/initStartOffsets", method = { 
RequestMethod.PUT })
+    @ResponseBody
+    public GeneralResponse initStartOffsets(@PathVariable String cubeName) {
+       CubeInstance cubeInstance = 
cubeService.getCubeManager().getCube(cubeName);
+
+        String msg = "";
+        if (cubeInstance == null) {
+            msg = "Cube '" + cubeName + "' not found.";
+            throw new IllegalArgumentException(msg);
+        }
+        if (cubeInstance.getSourceType() != ISourceAware.ID_STREAMING) {
+            msg = "Cube '" + cubeName + "' is not a Streaming Cube.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        final GeneralResponse response = new GeneralResponse();
+        try {
+            final Map<Integer, Long> startOffsets = 
KafkaClient.getCurrentOffsets(cubeInstance);
+            CubeDesc desc = cubeInstance.getDescriptor();
+            desc.setPartitionOffsetStart(startOffsets);
+            cubeService.getCubeDescManager().updateCubeDesc(desc);
+            response.setProperty("result", "success");
+            response.setProperty("offsets", startOffsets.toString());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return response;
+    }
+
     public void setCubeService(CubeService cubeService) {
         this.cubeService = cubeService;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
index 5751095..2a3dbb5 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
@@ -61,6 +61,12 @@ public class SeekOffsetStep extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as 
the offset is provided.");
         }
 
+        final Map<Integer, Long> cubeDescStart = 
cube.getDescriptor().getPartitionOffsetStart();
+        if (cube.getSegments().size() == 1 &&  cubeDescStart != null && 
cubeDescStart.size() > 0) {
+            logger.info("This is the first segment, and has initiated the 
start offsets, will use it");
+            startOffsets = cubeDescStart;
+        }
+
         final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable());
         final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
         final String topic = kafakaConfig.getTopic();
@@ -110,8 +116,6 @@ public class SeekOffsetStep extends AbstractExecutable {
         }
 
         if (totalEndOffset > totalStartOffset) {
-            segment.setSourceOffsetStart(totalStartOffset);
-            segment.setSourceOffsetEnd(totalEndOffset);
             segment.setSourcePartitionOffsetStart(startOffsets);
             segment.setSourcePartitionOffsetEnd(endOffsets);
             segment.setName(CubeSegment.makeSegmentName(0, 0, 
totalStartOffset, totalEndOffset));

http://git-wip-us.apache.org/repos/asf/kylin/blob/c92f79ad/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 640cc53..685af6a 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -17,14 +17,19 @@
 */
 package org.apache.kylin.source.kafka.util;
 
+import com.google.common.collect.Maps;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -44,7 +49,7 @@ public class KafkaClient {
         return producer;
     }
 
-    private static Properties constructDefaultKafkaProducerProperties(String 
brokers, Properties properties){
+    private static Properties constructDefaultKafkaProducerProperties(String 
brokers, Properties properties) {
         Properties props = new Properties();
         props.put("bootstrap.servers", brokers);
         props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
@@ -111,5 +116,20 @@ public class KafkaClient {
         return consumer.position(topicPartition);
     }
 
+    public static Map<Integer, Long> getCurrentOffsets(final CubeInstance 
cubeInstance) {
+        final KafkaConfig kafakaConfig = 
KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable());
 
+        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
+        final String topic = kafakaConfig.getTopic();
+
+        Map<Integer, Long> startOffsets = Maps.newHashMap();
+        try (final KafkaConsumer consumer = 
KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+            final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(topic);
+            for (PartitionInfo partitionInfo : partitionInfos) {
+                long latest = KafkaClient.getLatestOffset(consumer, topic, 
partitionInfo.partition());
+                startOffsets.put(partitionInfo.partition(), latest);
+            }
+        }
+        return startOffsets;
+    }
 }

Reply via email to