KYLIN-1311 Stream cubing auto assignment and load balance
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/688b762d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/688b762d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/688b762d Branch: refs/heads/helix-201602 Commit: 688b762dc5ee756261bc42576935029fb9180f11 Parents: c615dcf Author: shaofengshi <[email protected]> Authored: Sat Feb 6 11:49:59 2016 +0800 Committer: shaofengshi <[email protected]> Committed: Sat Feb 6 13:33:07 2016 +0800 ---------------------------------------------------------------------- build/bin/streaming_check.sh | 13 ++- build/bin/streaming_fillgap.sh | 1 - build/conf/kylin.properties | 6 +- .../apache/kylin/common/KylinConfigBase.java | 4 + .../kylin/engine/streaming/StreamingConfig.java | 33 ++++++ .../engine/streaming/cli/StreamingCLI.java | 2 +- .../streaming/monitor/StreamingMonitor.java | 11 +- .../rest/controller/ClusterController.java | 55 +++++++--- .../rest/controller/StreamingController.java | 52 ++++++++- .../kylin/rest/helix/HelixClusterAdmin.java | 69 +++++++++--- .../helix/StreamCubeBuildTransitionHandler.java | 105 ++++++++++++++----- .../rest/request/StreamingBuildRequest.java | 13 +-- .../kylin/rest/service/StreamingService.java | 27 +++-- 13 files changed, 299 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/build/bin/streaming_check.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_check.sh b/build/bin/streaming_check.sh index fef0139..4c5431a 100644 --- a/build/bin/streaming_check.sh +++ b/build/bin/streaming_check.sh @@ -20,10 +20,9 @@ source /etc/profile source ~/.bash_profile -receivers=$1 -host=$2 -tablename=$3 -authorization=$4 -projectname=$5 -cubename=$6 -sh ${KYLIN_HOME}/bin/kylin.sh monitor -receivers ${receivers} -host ${host} -tableName ${tablename} -authorization ${authorization} -cubeName ${cubename} -projectName ${projectname} \ No newline at end of file +CUBE_NAME=$1 +AUTHORIZATION=$2 +KYLIN_HOST=$3 + +cd ${KYLIN_HOME} +curl --request PUT --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/checkgap http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/build/bin/streaming_fillgap.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh index 31c4886..fe8c0b5 100644 --- a/build/bin/streaming_fillgap.sh +++ b/build/bin/streaming_fillgap.sh @@ -25,5 +25,4 @@ AUTHORIZATION=$2 KYLIN_HOST=$3 cd ${KYLIN_HOME} -#sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin} curl --request PUT --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/fillgap http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index b7e9b28..75269de 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -2,12 +2,12 @@ # Whether this kylin run as an instance of a cluster kylin.cluster.enabled=false -# Comma separated list of zk servers; -# Optional; if absent, will use HBase zookeeper; set if use a different zk; +# Comma separated list of zk servers, for cluster coordination; +# Optional; if absent, will use HBase zookeeper; set it if use a different zk; kylin.zookeeper.address= # REST address of this instance, need be accessible from other instances; -# optional, default be <hostname>:7070 +# optional, default be <hostname_fqdn>:<port> kylin.rest.address= # whether run a cluster controller in this instance; a robust cluster need at least 3 controllers. http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 7c127f7..87e4566 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -541,6 +541,10 @@ public class KylinConfigBase implements Serializable { public String getClusterName() { return this.getOptional("kylin.cluster.name", getMetadataUrlPrefix()); } + + public int getClusterMaxPartitionPerRegion() { + return Integer.parseInt(getOptional("kylin.cluster.max.partition.per.resource", "100")); + } public void setClusterName(String clusterName) { setProperty("kylin.cluster.name", clusterName); http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java index f0a7ab1..ee9aed8 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java @@ -39,6 +39,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.List; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; @@ -64,6 +65,14 @@ public class StreamingConfig extends RootPersistentEntity { @JsonProperty("cubeName") private String cubeName; + @JsonProperty("partitions") + private List<String> partitions; + + @JsonProperty("max_gap") + private long maxGap = 30 * 60 * 1000l; // 30 minutes + @JsonProperty("max_gap_number") + private int maxGapNumber = 10; // 10 + public String getCubeName() { return cubeName; } @@ -96,6 +105,30 @@ public class StreamingConfig extends RootPersistentEntity { return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + streamingName + ".json"; } + public List<String> getPartitions() { + return partitions; + } + + public void setPartitions(List<String> partitions) { + this.partitions = partitions; + } + + public long getMaxGap() { + return maxGap; + } + + public void setMaxGap(long maxGap) { + this.maxGap = maxGap; + } + + public int getMaxGapNumber() { + return maxGapNumber; + } + + public void setMaxGapNumber(int maxGapNumber) { + this.maxGapNumber = maxGapNumber; + } + @Override public StreamingConfig clone() { try { http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java index 96ad1ad..88f5e18 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java @@ -82,7 +82,7 @@ public class StreamingCLI { } if (bootstrapConfig.isFillGap()) { final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming()); - final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName()); + final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName(), streamingConfig.getMaxGap()); logger.info("all gaps:" + StringUtils.join(gaps, ",")); for (Pair<Long, Long> gap : gaps) { startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond()); http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java index 9609442..9d2bd45 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java @@ -74,7 +74,7 @@ public class StreamingMonitor { sendMail(receivers, title, stringBuilder.toString()); } - public static final List<Pair<Long, Long>> findGaps(String cubeName) { + public static final List<Pair<Long, Long>> findGaps(String cubeName, long maxGapAtOnce) { List<CubeSegment> segments = getSortedReadySegments(cubeName); List<Pair<Long, Long>> gaps = Lists.newArrayList(); for (int i = 0; i < segments.size() - 1; ++i) { @@ -83,7 +83,12 @@ public class StreamingMonitor { if (first.getDateRangeEnd() == second.getDateRangeStart()) { continue; } else if (first.getDateRangeEnd() < second.getDateRangeStart()) { - gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart())); + long start = first.getDateRangeEnd(); + while (start < second.getDateRangeStart()) { + long end = Math.min(start + maxGapAtOnce, second.getDateRangeStart()); + gaps.add(Pair.newPair(start, end)); + start = end; + } } } return gaps; @@ -119,7 +124,7 @@ public class StreamingMonitor { logger.info("cube:" + cubeName + " does not exist"); return; } - List<Pair<Long, Long>> gaps = findGaps(cubeName); + List<Pair<Long, Long>> gaps = findGaps(cubeName, Long.MAX_VALUE); List<Pair<String, String>> overlaps = Lists.newArrayList(); StringBuilder content = new StringBuilder(); if (!gaps.isEmpty()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java index 97fff36..86a0398 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java @@ -19,23 +19,26 @@ package org.apache.kylin.rest.controller; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.JobInstance; -import org.apache.kylin.job.constant.JobStatusEnum; -import org.apache.kylin.job.constant.JobTimeFilterEnum; -import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.helix.HelixClusterAdmin; -import org.apache.kylin.rest.request.JobListRequest; -import org.apache.kylin.rest.service.JobService; +import org.apache.kylin.rest.request.StreamingBuildRequest; +import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; -import java.util.*; +import java.io.IOException; +import java.util.Collection; /** * @@ -56,15 +59,37 @@ public class ClusterController extends BasicController implements InitializingBe final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig); - clusterAdmin.start(); + if (kylinConfig.isClusterEnabled()) { + final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig); + clusterAdmin.start(); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - clusterAdmin.stop(); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + clusterAdmin.stop(); + } + })); + } else { + String serverMode = kylinConfig.getServerMode(); + if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) { + logger.info("Initializing Job Engine ...."); + new Thread(new Runnable() { + @Override + public void run() { + try { + DefaultScheduler scheduler = DefaultScheduler.createInstance(); + scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); + if (!scheduler.hasStarted()) { + logger.error("scheduler has not been started"); + System.exit(1); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }).start(); } - })); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java index 209c552..e33a1c9 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java @@ -24,9 +24,11 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.monitor.StreamingMonitor; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.ForbiddenException; import org.apache.kylin.rest.exception.InternalErrorException; @@ -249,7 +251,12 @@ public class StreamingController extends BasicController { } streamingBuildRequest.setStreaming(streamingConfig.getName()); - streamingService.buildStream(cube, streamingBuildRequest); + try { + streamingService.buildStream(cube, streamingBuildRequest); + } catch (IOException e) { + e.printStackTrace(); + return streamingBuildRequest; + } streamingBuildRequest.setMessage("Build request is submitted successfully."); streamingBuildRequest.setSuccessful(true); return streamingBuildRequest; @@ -274,13 +281,52 @@ public class StreamingController extends BasicController { StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(); streamingBuildRequest.setStreaming(streamingConfig.getName()); - streamingService.fillGap(cube); - streamingBuildRequest.setMessage("FillGap request is submitted successfully."); + List<Pair<Long, Long>> gaps = null; + try { + gaps = streamingService.fillGap(cube); + } catch (IOException e) { + logger.error("", e); + return streamingBuildRequest; + } + streamingBuildRequest.setMessage("FillGap request is submitted successfully, gap number: " + gaps.size()); + streamingBuildRequest.setSuccessful(true); + return streamingBuildRequest; + + } + + /** + * check wheter gap exists in a cube + * + * @param cubeName Cube Name + * @return + * @throws IOException + */ + @RequestMapping(value = "/{cubeName}/checkgap", method = { RequestMethod.PUT }) + @ResponseBody + public StreamingBuildRequest checkGap(@PathVariable String cubeName) { + StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName); + Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found."); + List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null); + Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found."); + CubeInstance cube = cubes.get(0); + + List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName(), streamingConfig.getMaxGap()); + logger.info("all gaps:" + StringUtils.join(gaps, ",")); + + StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(); + streamingBuildRequest.setStreaming(streamingConfig.getName()); + if (gaps.size() > 0) { + streamingBuildRequest.setMessage(gaps.size() + " gaps in cube: " + StringUtils.join(gaps, ",")); + } else { + streamingBuildRequest.setMessage("No gap."); + } streamingBuildRequest.setSuccessful(true); return streamingBuildRequest; } + + public void setStreamingService(StreamingService streamingService) { this.streamingService = streamingService; } http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java index 4da9a86..680e371 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java @@ -18,11 +18,9 @@ package org.apache.kylin.rest.helix; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.helix.*; import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.controller.HelixControllerMain; @@ -32,11 +30,11 @@ import org.apache.helix.tools.StateModelConfigGenerator; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.StreamingBuildRequest; import org.apache.kylin.storage.hbase.HBaseConnection; -import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; -import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +66,7 @@ public class HelixClusterAdmin { private static final Logger logger = LoggerFactory.getLogger(HelixClusterAdmin.class); private final String zkAddress; - private final ZKHelixAdmin admin; + private final HelixAdmin admin; private final String clusterName; private HelixClusterAdmin(KylinConfig kylinConfig) { @@ -80,7 +78,7 @@ public class HelixClusterAdmin { zkAddress = HBaseConnection.getZKConnectString(); logger.info("no 'kylin.zookeeper.address' in kylin.properties, use HBase zookeeper " + zkAddress); } - + this.clusterName = kylinConfig.getClusterName(); this.admin = new ZKHelixAdmin(zkAddress); } @@ -130,24 +128,59 @@ public class HelixClusterAdmin { } - public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) { + public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) throws IOException { String resourceName = streamingBuildRequest.toResourceName(); - if (admin.getResourcesInCluster(clusterName).contains(resourceName)) { - logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add."); - admin.dropResource(clusterName, resourceName); + if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) { + logger.info("Resource '" + resourceName + "' is new, add it with 0 partitions in cluster."); + admin.addResource(clusterName, resourceName, 0, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name()); } - admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name()); - rebalanceWithTag(resourceName, TAG_STREAM_BUILDER); + IdealState idealState = admin.getResourceIdealState(clusterName, resourceName); + + StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()); + List<String> partitions = streamingConfig.getPartitions(); + if (partitions == null) { + partitions = Lists.newArrayList(); + } + + if (partitions.size() != idealState.getNumPartitions() || idealState.getNumPartitions() >= kylinConfig.getClusterMaxPartitionPerRegion()) { + if (partitions.size() != idealState.getNumPartitions()) { + logger.error("Cluster resource partition number doesn't match with the partitions in StreamingConfig: " + resourceName); + } else { + logger.error("Partitions number for resource '" + resourceName + " exceeds the up limit: " + kylinConfig.getClusterMaxPartitionPerRegion()); + } + logger.info("Drop and create resource: " + resourceName); + cleanResourcePartitions(resourceName); + idealState = admin.getResourceIdealState(clusterName, resourceName); + streamingConfig.getPartitions().clear(); + StreamingManager.getInstance(kylinConfig).updateStreamingConfig(streamingConfig); + streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()); + partitions = Lists.newArrayList(); + } + + partitions.add(streamingBuildRequest.toPartitionName()); + streamingConfig.setPartitions(partitions); + StreamingManager.getInstance(kylinConfig).updateStreamingConfig(streamingConfig); + + idealState.setNumPartitions(idealState.getNumPartitions() + 1); + admin.setResourceIdealState(clusterName, resourceName, idealState); + rebalanceWithTag(resourceName, TAG_STREAM_BUILDER); } - public void dropStreamingJob(String streamingName, long start, long end) { - String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end; - admin.dropResource(clusterName, resourceName); + + private void cleanResourcePartitions(String resourceName) { + IdealState is = admin.getResourceIdealState(clusterName, resourceName); + is.getRecord().getListFields().clear(); + is.getRecord().getMapFields().clear(); + is.setNumPartitions(0); + admin.setResourceIdealState(clusterName, resourceName, is); + + logger.info("clean all partitions in resource: " + resourceName); } /** * Start the instance and register the state model factory + * * @param instanceName * @throws Exception */ @@ -161,11 +194,11 @@ public class HelixClusterAdmin { /** * Rebalance the resource with the tags + * * @param tags */ protected void rebalanceWithTag(String resourceName, String tag) { - List<String> instances = admin.getInstancesInClusterWithTag(clusterName, tag); - admin.rebalance(clusterName, resourceName, instances.size(), "", tag); + admin.rebalance(clusterName, resourceName, 2, null, tag); } /** @@ -206,6 +239,7 @@ public class HelixClusterAdmin { /** * Check whether current kylin instance is in the leader role + * * @return */ public boolean isLeaderRole(String resourceName) { @@ -220,6 +254,7 @@ public class HelixClusterAdmin { /** * Add instance to cluster, with a tag list + * * @param instanceName should be unique in format: hostName_port * @param tags */ http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java index 44d8302..705d8a7 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java @@ -11,6 +11,7 @@ import org.apache.kylin.common.KylinConfigBase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.rest.request.StreamingBuildRequest; import org.slf4j.Logger; @@ -43,43 +44,81 @@ public class StreamCubeBuildTransitionHandler extends TransitionHandler { @Transition(to = "LEADER", from = "STANDBY") public void onBecomeLeaderFromStandby(Message message, NotificationContext context) { String resourceName = message.getResourceId().stringify(); - StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName); + final StreamingBuildRequest streamingBuildRequest = getStreamingBuildRequest(resourceName, message.getPartitionName()); + if (streamingBuildRequest != null && isSuccessfullyBuilt(streamingBuildRequest) == false) { + KylinConfigBase.getKylinHome(); + String segmentId = streamingBuildRequest.toPartitionName(); + String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming(); + runCMD(cmd); + } + } - final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()).getCubeName(); + @Transition(to = "STANDBY", from = "LEADER") + public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { + String resourceName = message.getResourceId().stringify(); + logger.info("Partition " + message.getPartitionId() + " becomes as Standby"); + /* + final StreamingBuildRequest streamingBuildRequest = getStreamingBuildRequest(resourceName, message.getPartitionName()); + if (isSuccessfullyBuilt(streamingBuildRequest) == false) { + KylinConfigBase.getKylinHome(); + String segmentId = streamingBuildRequest.toPartitionName(); + String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId; + runCMD(cmd); + } + */ + } + + private boolean isSuccessfullyBuilt(StreamingBuildRequest streamingBuildRequest) { + final StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()); + final String cubeName = streamingConfig.getCubeName(); final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); for (CubeSegment segment : cube.getSegments()) { if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) { - logger.info("Segment " + segment.getName() + " already exist, no need rebuild."); - return; + logger.info("Segment " + segment.getName() + " already exist."); + return true; } } - KylinConfigBase.getKylinHome(); - String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd(); - String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming(); - logger.info("Executing: " + cmd); - try { - String line; - Process p = Runtime.getRuntime().exec(cmd); - BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); - while ((line = input.readLine()) != null) { - logger.info(line); + return false; + } + + private StreamingBuildRequest getStreamingBuildRequest(String resourceName, String partitionName) { + String streamConfigName = resourceName.substring(HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX.length()); + int partitionId = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1)); + + StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamConfigName); + + int retry = 0; + while ((streamingConfig.getPartitions() == null || streamingConfig.getPartitions().isEmpty() || partitionId > (streamingConfig.getPartitions().size() - 1) && retry < 10)) { + logger.error("No segment information in StreamingConfig '" + streamConfigName + "' for partition " + partitionId); + logger.error("Wait for 0.5 second..."); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + logger.error("", e); } - input.close(); - } catch (IOException err) { - logger.error("Error happens during build streaming '" + resourceName + "'", err); - throw new RuntimeException(err); + streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamConfigName); + retry++; } + if (retry >= 10) { + logger.error("No segment information in StreamingConfig '" + streamConfigName + "' for partition " + partitionId); + logger.warn("Abor building..."); + return null; + } + + String startEnd = streamingConfig.getPartitions().get(partitionId); + long start = Long.parseLong(startEnd.substring(0, startEnd.indexOf("_"))); + long end = Long.parseLong(startEnd.substring(startEnd.indexOf("_") + 1)); + StreamingBuildRequest request = new StreamingBuildRequest(); + request.setStreaming(streamConfigName); + request.setStart(start); + request.setEnd(end); + return request; + } - @Transition(to = "STANDBY", from = "LEADER") - public void onBecomeStandbyFromLeader(Message message, NotificationContext context) { - String resourceName = message.getResourceId().stringify(); - StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName); - KylinConfigBase.getKylinHome(); - String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd(); - String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId; + private void runCMD(String cmd) { logger.info("Executing: " + cmd); try { String line; @@ -90,9 +129,10 @@ public class StreamCubeBuildTransitionHandler extends TransitionHandler { } input.close(); } catch (IOException err) { - logger.error("Error happens during build streaming '" + resourceName + "'", err); + logger.error("Error happens when running '" + cmd + "'", err); throw new RuntimeException(err); } + } @Transition(to = "STANDBY", from = "OFFLINE") @@ -104,4 +144,17 @@ public class StreamCubeBuildTransitionHandler extends TransitionHandler { public void onBecomeOfflineFromStandby(Message message, NotificationContext context) { } + + @Transition(to = "DROPPED", from = "OFFLINE") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) + throws Exception { + logger.info("Default OFFLINE->DROPPED transition invoked."); + } + + @Transition(to = "OFFLINE", from = "DROPPED") + public void onBecomeOfflineFromDropped(Message message, NotificationContext context) + throws Exception { + logger.info("Default DROPPED->OFFLINE transition invoked."); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java index dcf91fd..201568e 100644 --- a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java +++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java @@ -81,16 +81,9 @@ public class StreamingBuildRequest { } public String toResourceName() { - return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming + "_" + start + "_" + end; + return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming; } - - public static StreamingBuildRequest fromResourceName(String resourceName) { - Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX)); - long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1)); - String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_")); - long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1)); - String streamingConfig = temp.substring(0, temp.lastIndexOf("_")); - - return new StreamingBuildRequest(streamingConfig, start, end); + public String toPartitionName() { + return start + "_" + end; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/688b762d/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java index 7c2cc48..6e732d9 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java @@ -18,6 +18,8 @@ package org.apache.kylin.rest.service; +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; @@ -98,20 +100,33 @@ public class StreamingService extends BasicService { } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") - public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) { + public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) throws IOException { HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv()); - clusterAdmin.addStreamingJob(streamingBuildRequest); + try { + clusterAdmin.addStreamingJob(streamingBuildRequest); + } catch (IOException e) { + logger.error("", e); + streamingBuildRequest.setSuccessful(false); + streamingBuildRequest.setMessage("Failed to submit job for " + streamingBuildRequest.getStreaming()); + } } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") - public void fillGap(CubeInstance cube) { + public List<Pair<Long, Long>> fillGap(CubeInstance cube) throws IOException { HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv()); final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfigByCube(cube.getName()); - final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName()); - logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ",")); - for (Pair<Long, Long> gap : gaps) { + final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName(), streamingConfig.getMaxGap()); + logger.info("all gaps:" + StringUtils.join(gaps, ",")); + + List<Pair<Long, Long>> filledGap = Lists.newArrayList(); + int max_gaps_at_one_time = streamingConfig.getMaxGapNumber(); + for (int i = 0; i < Math.min(gaps.size(), max_gaps_at_one_time); i++) { + Pair<Long, Long> gap = gaps.get(i); StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(streamingConfig.getName(), gap.getFirst(), gap.getSecond()); clusterAdmin.addStreamingJob(streamingBuildRequest); + filledGap.add(gap); } + + return filledGap; } }
