KYLIN-1311 write rest servers to file
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2a222be1 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2a222be1 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2a222be1 Branch: refs/heads/helix-201602 Commit: 2a222be173794bd2822ccc05e6efb3dca865d662 Parents: 4f41fd5 Author: shaofengshi <[email protected]> Authored: Sun Jan 24 21:41:56 2016 +0800 Committer: shaofengshi <[email protected]> Committed: Sat Feb 6 13:33:06 2016 +0800 ---------------------------------------------------------------------- build/bin/streaming_build.sh | 9 ++++-- build/bin/streaming_fillgap.sh | 8 +++-- .../org/apache/kylin/common/KylinConfig.java | 31 ++++++++++++++++++++ .../kylin/rest/helix/HelixClusterAdmin.java | 27 ++++++++++------- 4 files changed, 58 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/2a222be1/build/bin/streaming_build.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh index cb86e29..ffb6101 100644 --- a/build/bin/streaming_build.sh +++ b/build/bin/streaming_build.sh @@ -20,15 +20,18 @@ source /etc/profile source ~/.bash_profile -STREAMING=$1 +CUBE_NAME=$1 INTERVAL=$2 DELAY=$3 MARGIN=$4 +AUTHORIZATION=$5 +KYLIN_HOST=$6 CURRENT_TIME_IN_SECOND=`date +%s` CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000)) START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY)) END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL)) ID="$START"_"$END" -echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log -sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -oneoff true -start ${START} -end ${END} -streaming ${STREAMING} -margin ${MARGIN} \ No newline at end of file +echo "building for ${CUBE_NAME} ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log +#sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE_NAME} ${ID} -oneoff true -start ${START} -end ${END} -streaming ${CUBE_NAME} -margin ${MARGIN} +curl --request PUT --data "{\"start\": $START, \"end\": $END }" --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/build http://git-wip-us.apache.org/repos/asf/kylin/blob/2a222be1/build/bin/streaming_fillgap.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh index 74d9037..31c4886 100644 --- a/build/bin/streaming_fillgap.sh +++ b/build/bin/streaming_fillgap.sh @@ -20,8 +20,10 @@ source /etc/profile source ~/.bash_profile -streaming=$1 -margin=$2 +CUBE_NAME=$1 +AUTHORIZATION=$2 +KYLIN_HOST=$3 cd ${KYLIN_HOME} -sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin} \ No newline at end of file +#sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin} +curl --request PUT --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/fillgap http://git-wip-us.apache.org/repos/asf/kylin/blob/2a222be1/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index 81f5827..08fb6dd 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -265,4 +265,35 @@ public class KylinConfig extends KylinConfigBase { } } + public static void writeOverrideProperties(Properties properties) throws IOException { + File propFile = getKylinProperties(); + File overrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override"); + overrideFile.createNewFile(); + FileInputStream fis2 = null; + Properties override = new Properties(); + try { + fis2 = new FileInputStream(overrideFile); + override.load(fis2); + for (Map.Entry<Object, Object> entries : properties.entrySet()) { + override.setProperty(entries.getKey().toString(), entries.getValue().toString()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + IOUtils.closeQuietly(fis2); + } + + PrintWriter pw = null; + try { + pw = new PrintWriter(overrideFile); + for (Enumeration e = override.propertyNames(); e.hasMoreElements();) { + String key = (String) e.nextElement(); + pw.println(key + "=" + override.getProperty(key)); + } + pw.close(); + } finally { + IOUtils.closeQuietly(pw); + } + + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/2a222be1/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java index 0758ef1..4da9a86 100644 --- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java +++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java @@ -40,8 +40,10 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -101,7 +103,7 @@ public class HelixClusterAdmin { addInstance(instanceName, instanceTags); startInstance(instanceName); - rebalanceWithTag(instanceTags); + rebalanceWithTag(RESOURCE_NAME_JOB_ENGINE, TAG_JOB_ENGINE); boolean startController = kylinConfig.isClusterController(); if (startController) { @@ -123,7 +125,7 @@ public class HelixClusterAdmin { // add job engine as a resource, 1 partition if (!admin.getResourcesInCluster(clusterName).contains(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE)) { - admin.addResource(clusterName, HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name()); + admin.addResource(clusterName, HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name()); } } @@ -134,8 +136,8 @@ public class HelixClusterAdmin { logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add."); admin.dropResource(clusterName, resourceName); } - admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name()); - admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER); + admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name()); + rebalanceWithTag(resourceName, TAG_STREAM_BUILDER); } @@ -161,13 +163,9 @@ public class HelixClusterAdmin { * Rebalance the resource with the tags * @param tags */ - protected void rebalanceWithTag(List<String> tags) { - for (String tag : tags) { - if (tag.equals(TAG_JOB_ENGINE)) { - List<String> instances = admin.getInstancesInClusterWithTag(clusterName, TAG_JOB_ENGINE); - admin.rebalance(clusterName, RESOURCE_NAME_JOB_ENGINE, instances.size(), "", tag); - } - } + protected void rebalanceWithTag(String resourceName, String tag) { + List<String> instances = admin.getInstancesInClusterWithTag(clusterName, tag); + admin.rebalance(clusterName, resourceName, instances.size(), "", tag); } /** @@ -277,6 +275,13 @@ public class HelixClusterAdmin { kylinConfig.setProperty("kylin.rest.servers", restServersInCluster); System.setProperty("kylin.rest.servers", restServersInCluster); logger.info("kylin.rest.servers update to " + restServersInCluster); + Properties properties = new Properties(); + properties.setProperty("kylin.rest.servers", restServersInCluster); + try { + KylinConfig.writeOverrideProperties(properties); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } Broadcaster.clearCache(); } }
