[
https://issues.apache.org/jira/browse/KYLIN-3509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16595037#comment-16595037
]
ASF GitHub Bot commented on KYLIN-3509:
---------------------------------------
shaofengshi closed pull request #213: KYLIN-3509 Allocate more memory for
merge-dictionary step
URL: https://github.com/apache/kylin/pull/213
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 58d9caacbc..4895bf0745 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1140,6 +1140,11 @@ public String getKylinJobMRLibDir() {
return getPropertiesByPrefix("kylin.engine.spark-conf.");
}
+ public Map<String, String> getSparkConfigOverrideWithSpecificName(String
configName) {
+ return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName +
".");
+ }
+
+
public double getDefaultHadoopJobReducerInputMB() {
return
Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
}
diff --git a/core-common/src/main/resources/kylin-defaults.properties
b/core-common/src/main/resources/kylin-defaults.properties
index 23c0730afa..e505def0a3 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -312,6 +312,10 @@
kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
+### Spark conf for specific job
+kylin.engine.spark-conf-mergedict.spark.executor.memory=6G
+kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
+
# manually upload spark-assembly jar to HDFS and then set this property will
avoid repeatedly uploading jar at runtime
#kylin.engine.spark-conf.spark.yarn.archive=hdfs://namenode:8020/kylin/spark/spark-libs.jar
#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
diff --git
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 560293cf44..5735a80975 100644
---
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -71,4 +71,6 @@ private ExecutableConstants() {
public static final String STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE =
"Update Lookup Snapshot Cache to Query Engine";
public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE =
"Take Snapshot to Metadata Store";
public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE
= "Update Cube Info";
+
+ public static final String SPARK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY =
"mergedict";
}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index d9027082b6..88a58ae5a2 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -26,6 +26,7 @@
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,8 +79,8 @@ public MapReduceExecutable
createMergeDictionaryStep(CubeSegment seg, String job
MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX);
- appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME,
seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID,
seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL,
getSegmentMetadataUrl(seg.getConfig(), jobID));
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index 4487610885..eb67fefaa3 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -89,6 +89,7 @@ public SparkExecutable createMergeDictionaryStep(CubeSegment
seg, String jobID,
sparkExecutable.setJobId(jobID);
sparkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+
sparkExecutable.setSparkConfigName(ExecutableConstants.SPARK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY);
StringBuilder jars = new StringBuilder();
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 612239741f..dea820698e 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -63,6 +63,7 @@
private static final String JARS = "jars";
private static final String JOB_ID = "jobId";
private static final String COUNTER_SAVE_AS = "CounterSaveAs";
+ private static final String CONFIG_NAME = "configName";
public void setClassName(String className) {
this.setParam(CLASS_NAME, className);
@@ -84,6 +85,17 @@ public String getCounterSaveAs() {
return getParam(COUNTER_SAVE_AS);
}
+ /**
+ * set spark override conf for specific job
+ */
+ public void setSparkConfigName(String configName) {
+ this.setParam(CONFIG_NAME, configName);
+ }
+
+ public String getSparkConfigName() {
+ return getParam(CONFIG_NAME);
+ }
+
private String formatArgs() {
StringBuilder stringBuilder = new StringBuilder();
for (Map.Entry<String, String> entry : getParams().entrySet()) {
@@ -92,7 +104,7 @@ private String formatArgs() {
if (entry.getKey().equals(CLASS_NAME)) {
stringBuilder.insert(0, tmp);
} else if (entry.getKey().equals(JARS) ||
entry.getKey().equals(JOB_ID)
- || entry.getKey().equals(COUNTER_SAVE_AS)) {
+ || entry.getKey().equals(COUNTER_SAVE_AS) ||
entry.getKey().equals(CONFIG_NAME)) {
// JARS is for spark-submit, not for app
continue;
} else {
@@ -221,6 +233,13 @@ protected ExecuteResult doWork(ExecutableContext context)
throws ExecuteExceptio
}
Map<String, String> sparkConfs = config.getSparkConfigOverride();
+
+ String sparkConfigName = getSparkConfigName();
+ if (sparkConfigName != null) {
+ Map<String, String> sparkSpecificConfs =
config.getSparkConfigOverrideWithSpecificName(sparkConfigName);
+ sparkConfs.putAll(sparkSpecificConfs);
+ }
+
for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
stringBuilder.append(" --conf
").append(entry.getKey()).append("=").append(entry.getValue())
.append(" ");
diff --git a/examples/test_case_data/sandbox/kylin.properties
b/examples/test_case_data/sandbox/kylin.properties
index edced8f1f3..e6a6bd6c9b 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -202,6 +202,10 @@
kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
+### Spark conf for specific job
+kylin.engine.spark-conf-mergedict.spark.executor.memory=1G
+kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
+
### QUERY PUSH DOWN ###
#kylin.query.pushdown.runner-class-name=org.apache.kylin.query.adhoc.PushDownRunnerJdbcImpl
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Allocate more memory for "Merge dictionary on yarn" step
> --------------------------------------------------------
>
> Key: KYLIN-3509
> URL: https://issues.apache.org/jira/browse/KYLIN-3509
> Project: Kylin
> Issue Type: Improvement
> Reporter: Chao Long
> Assignee: Chao Long
> Priority: Major
> Fix For: v2.5.0
>
>
> "Merge dictionary on yarn" step has been finished in KYLIN-3471, but this
> step will consume a great deal of memory, we should allocate more memory for
> it.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)