lidongsjtu closed pull request #122: JIRA-3288 specify mapreduce.job.queuename 
when submit sqoop job
URL: https://github.com/apache/kylin/pull/122
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 7fc26de350..457c832227 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.source.jdbc;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -40,6 +41,8 @@
 public class JdbcHiveMRInput extends HiveMRInput {
 
     private static final Logger logger = 
LoggerFactory.getLogger(JdbcHiveMRInput.class);
+    private static final String MR_OVERRIDE_QUEUE_KEY = 
"mapreduce.job.queuename";
+    private static final String DEFAULT_QUEUE = "default";
 
     public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         return new BatchCubingInputSide(flatDesc);
@@ -125,6 +128,14 @@ private TblColRef determineSplitColumn() {
             return splitColumn;
         }
 
+        private String getSqoopJobQueueName(KylinConfig config) {
+            Map<String, String> mrConfigOverride = 
config.getMRConfigOverride();
+            if (mrConfigOverride.containsKey(MR_OVERRIDE_QUEUE_KEY)) {
+                return mrConfigOverride.get(MR_OVERRIDE_QUEUE_KEY);
+            }
+            return DEFAULT_QUEUE;
+        }
+
         private AbstractExecutable createSqoopToFlatHiveStep(String 
jobWorkingDir, String cubeName) {
             KylinConfig config = getConfig();
             PartitionDesc partitionDesc = 
flatDesc.getDataModel().getPartitionDesc();
@@ -162,13 +173,16 @@ private AbstractExecutable 
createSqoopToFlatHiveStep(String jobWorkingDir, Strin
                 bquery += " WHERE " + partitionString;
             }
 
-            String cmd = String.format(String.format(
+            //related to 
"kylin.engine.mr.config-override.mapreduce.job.queuename"
+            String queueName = getSqoopJobQueueName(config);
+            String cmd = String.format(
                     "%s/sqoop import 
-Dorg.apache.sqoop.splitter.allow_text_splitter=true "
+                            + "-Dmapreduce.job.queuename=%s "
                             + "--connect \"%s\" --driver %s --username %s 
--password %s --query \"%s AND \\$CONDITIONS\" "
                             + "--target-dir %s/%s --split-by %s.%s 
--boundary-query \"%s\" --null-string '' "
                             + "--fields-terminated-by '%s' --num-mappers %d",
-                    sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, 
selectSql, jobWorkingDir, hiveTable,
-                    splitTable, splitColumn, bquery, filedDelimiter, 
mapperNum));
+                    sqoopHome, queueName, connectionUrl, driverClass, 
jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
+                    splitTable, splitColumn, bquery, filedDelimiter, 
mapperNum);
             logger.debug(String.format("sqoop cmd:%s", cmd));
             CmdStep step = new CmdStep();
             step.setCmd(cmd);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to