http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index bbd0fc5..44c80ee 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -21,7 +21,11 @@ package org.apache.kylin.engine.mr; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; import java.util.regex.Matcher; import org.apache.commons.lang3.tuple.Pair; @@ -98,7 +102,6 @@ public class CubingJob extends DefaultChainedExecutable { super(); } - void setDeployEnvName(String name) { setParam(DEPLOY_ENV_NAME, name); } @@ -126,17 +129,17 @@ public class CubingJob extends DefaultChainedExecutable { return null; } switch (state) { - case ERROR: - logMsg = output.getVerboseMsg(); - break; - case DISCARDED: - logMsg = "job has been discarded"; - break; - case SUCCEED: - logMsg = "job has succeeded"; - break; - default: - return null; + case ERROR: + logMsg = output.getVerboseMsg(); + break; + case DISCARDED: + logMsg = "job has been discarded"; + break; + case SUCCEED: + logMsg = "job has succeeded"; + break; + default: + return null; } String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE; content = content.replaceAll("\\$\\{job_name\\}", getName());
http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java index 227e176..300b123 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java @@ -119,7 +119,7 @@ public class DFSFileTableReader implements TableReader { private String[] split(String line, String delim) { // FIXME CVS line should be parsed considering escapes - String str[] = StringSplitter.split(line, delim); + String[] str = StringSplitter.split(line, delim); // un-escape CSV if (DFSFileTable.DELIM_COMMA.equals(delim)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java index e989042..276af65 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput.java @@ -27,7 +27,6 @@ public interface IMROutput { /** Return a helper to participate in batch cubing job flow. */ public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg); - /** Return a helper to participate in batch cubing job flow. */ public IMRBatchInvertedIndexingOutputSide getBatchInvertedIndexingOutputSide(IISegment seg); @@ -81,7 +80,6 @@ public interface IMROutput { public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow); } - /** * Participate the batch inverted indexing flow as the output side. Responsible for saving * the output to storage (Phase 3). http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index d7676f1..5228088 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -34,7 +34,6 @@ import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep; import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.realization.IRealizationSegment; import com.google.common.base.Preconditions; @@ -209,5 +208,4 @@ public class JobBuilderSupport { return paths; } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 5472928..9b5ed67 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -104,7 +104,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { System.exit(5); } } - + // ============================================================================ protected String name; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index a614f4b..387e695 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -66,12 +66,12 @@ public interface BatchConstants { String ARG_II_NAME = "iiname"; String ARG_SEGMENT_NAME = "segmentname"; String ARG_PARTITION = "partitions"; - String ARG_STATS_ENABLED= "statisticsenabled"; - String ARG_STATS_OUTPUT= "statisticsoutput"; - String ARG_STATS_SAMPLING_PERCENT= "statisticssamplingpercent"; - String ARG_HTABLE_NAME= "htablename"; - String ARG_INPUT_FORMAT= "inputformat"; - String ARG_LEVEL= "level"; + String ARG_STATS_ENABLED = "statisticsenabled"; + String ARG_STATS_OUTPUT = "statisticsoutput"; + String ARG_STATS_SAMPLING_PERCENT = "statisticssamplingpercent"; + String ARG_HTABLE_NAME = "htablename"; + String ARG_INPUT_FORMAT = "inputformat"; + String ARG_LEVEL = "level"; /** * logger and counter http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 8973a99..cab0c8d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -18,8 +18,17 @@ package org.apache.kylin.engine.mr.common; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; @@ -50,16 +59,8 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintWriter; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * This should be in cube module. It's here in engine-mr because currently stats http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java index c1f1377..7b65ec6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java @@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.common; import java.io.IOException; import java.util.Map; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java index 78b272c..87c3211 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java @@ -18,23 +18,22 @@ package org.apache.kylin.engine.mr.common; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.kv.RowConstants; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; public class CuboidStatsUtil { @@ -42,6 +41,7 @@ public class CuboidStatsUtil { Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException { writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0); } + public static void writeCuboidStatistics(Configuration conf, Path outputPath, // Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage, double mapperOverlapRatio) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); @@ -55,10 +55,10 @@ public class CuboidStatsUtil { try { // mapper overlap ratio at key -1 writer.append(new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio))); - + // sampling percentage at key 0 - writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage))); - + writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage))); + for (long i : allCuboids) { valueBuf.clear(); cuboidHLLMap.get(i).writeRegisters(valueBuf); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java index f25d41f..84fc03d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java @@ -51,6 +51,8 @@ public class HadoopJobStatusChecker { case PREP: status = JobStepStatusEnum.WAITING; break; + default: + throw new IllegalStateException(); } } catch (Exception e) { logger.error("error check status", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java index 56aa3c8..42b83f0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopShellExecutable.java @@ -28,17 +28,16 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; - -import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** */ public class HadoopShellExecutable extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(HadoopShellExecutable.class); - private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS"; private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS"; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java index ef45ed1..d32928f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java @@ -86,8 +86,13 @@ public class HadoopStatusChecker { case FAILED: case KILLING: case KILLED: + break; + default: + throw new IllegalStateException(); } break; + default: + throw new IllegalStateException(); } } catch (Exception e) { logger.error("error check status", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java index ab56161..927c012 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java @@ -18,6 +18,10 @@ package org.apache.kylin.engine.mr.common; +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.Principal; + import org.apache.commons.httpclient.Header; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpMethod; @@ -47,10 +51,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.Principal; - /** */ public class HadoopStatusGetter { @@ -68,43 +68,43 @@ public class HadoopStatusGetter { public Pair<RMAppState, FinalApplicationStatus> get(boolean useKerberosAuth) throws IOException { String applicationId = mrJobId.replace("job", "application"); String url = yarnUrl.replace("${job_id}", applicationId); - String response = useKerberosAuth ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url); + String response = useKerberosAuth ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url); logger.debug("Hadoop job " + mrJobId + " status : " + response); JsonNode root = new ObjectMapper().readTree(response); RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue()); FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue()); return Pair.of(state, finalStatus); } - + private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf"; + private String getHttpResponseWithKerberosAuth(String url) throws IOException { - String krb5ConfigPath = System.getProperty("java.security.krb5.conf"); - if(krb5ConfigPath == null) { - krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION; - } - boolean skipPortAtKerberosDatabaseLookup = true; - System.setProperty("java.security.krb5.conf", krb5ConfigPath); - System.setProperty("sun.security.krb5.debug", "true"); - System.setProperty("javax.security.auth.useSubjectCredsOnly","false"); - Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create() - .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup)) - .build(); - CloseableHttpClient client = HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build(); - HttpClientContext context = HttpClientContext.create(); + String krb5ConfigPath = System.getProperty("java.security.krb5.conf"); + if (krb5ConfigPath == null) { + krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION; + } + boolean skipPortAtKerberosDatabaseLookup = true; + System.setProperty("java.security.krb5.conf", krb5ConfigPath); + System.setProperty("sun.security.krb5.debug", "true"); + System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); + Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider> create().register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup)).build(); + CloseableHttpClient client = HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build(); + HttpClientContext context = HttpClientContext.create(); BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); Credentials useJaasCreds = new Credentials() { - public String getPassword() { - return null; - } - public Principal getUserPrincipal() { - return null; - } + public String getPassword() { + return null; + } + + public Principal getUserPrincipal() { + return null; + } }; - + credentialsProvider.setCredentials(new AuthScope(null, -1, null), useJaasCreds); context.setCredentialsProvider(credentialsProvider); String response = null; - while(response == null) { + while (response == null) { if (url.startsWith("https://")) { registerEasyHttps(); } @@ -115,19 +115,19 @@ public class HadoopStatusGetter { HttpGet httpget = new HttpGet(url); httpget.addHeader("accept", "application/json"); try { - CloseableHttpResponse httpResponse = client.execute(httpget,context); + CloseableHttpResponse httpResponse = client.execute(httpget, context); String redirect = null; org.apache.http.Header h = httpResponse.getFirstHeader("Location"); if (h != null) { - redirect = h.getValue(); + redirect = h.getValue(); if (isValidURL(redirect) == false) { logger.info("Get invalid redirect url, skip it: " + redirect); - Thread.sleep(1000l); + Thread.sleep(1000L); continue; } } else { - h = httpResponse.getFirstHeader("Refresh"); - if (h != null) { + h = httpResponse.getFirstHeader("Refresh"); + if (h != null) { String s = h.getValue(); int cut = s.indexOf("url="); if (cut >= 0) { @@ -135,27 +135,27 @@ public class HadoopStatusGetter { if (isValidURL(redirect) == false) { logger.info("Get invalid redirect url, skip it: " + redirect); - Thread.sleep(1000l); + Thread.sleep(1000L); continue; } } } } - + if (redirect == null) { response = IOUtils.toString(httpResponse.getEntity().getContent()); logger.debug("Job " + mrJobId + " get status check result.\n"); } else { url = redirect; logger.debug("Job " + mrJobId + " check redirect url " + url + ".\n"); - } + } } catch (InterruptedException e) { - logger.error(e.getMessage()); + logger.error(e.getMessage()); } finally { httpget.releaseConnection(); } } - + return response; } @@ -184,7 +184,7 @@ public class HadoopStatusGetter { redirect = h.getValue(); if (isValidURL(redirect) == false) { logger.info("Get invalid redirect url, skip it: " + redirect); - Thread.sleep(1000l); + Thread.sleep(1000L); continue; } } else { @@ -197,7 +197,7 @@ public class HadoopStatusGetter { if (isValidURL(redirect) == false) { logger.info("Get invalid redirect url, skip it: " + redirect); - Thread.sleep(1000l); + Thread.sleep(1000L); continue; } } @@ -245,5 +245,5 @@ public class HadoopStatusGetter { return false; } - + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 19ad132..7592b71 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.Constructor; -import java.util.Collections; import java.util.Map; import org.apache.commons.lang.StringUtils; @@ -141,14 +140,14 @@ public class MapReduceExecutable extends AbstractExecutable { final StringBuilder output = new StringBuilder(); final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output); -// final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); -// if (restStatusCheckUrl == null) { -// logger.error("restStatusCheckUrl is null"); -// return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); -// } -// String mrJobId = hadoopCmdOutput.getMrJobId(); -// boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); -// HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); + // final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); + // if (restStatusCheckUrl == null) { + // logger.error("restStatusCheckUrl is null"); + // return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); + // } + // String mrJobId = hadoopCmdOutput.getMrJobId(); + // boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); + // HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); JobStepStatusEnum status = JobStepStatusEnum.NEW; while (!isDiscarded()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java index 86fedf0..7c0748a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJob.java @@ -18,15 +18,14 @@ package org.apache.kylin.engine.mr.invertedindex; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.DefaultChainedExecutable; - import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.DefaultChainedExecutable; + /** */ public class IIJob extends DefaultChainedExecutable { @@ -54,7 +53,6 @@ public class IIJob extends DefaultChainedExecutable { return getParam(SEGMENT_ID); } - public static IIJob createBuildJob(IISegment seg, String submitter, JobEngineConfig config) { return initialJob(seg, "BUILD", submitter, config); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java index a1251a3..27e2470 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java @@ -18,6 +18,8 @@ package org.apache.kylin.engine.mr.invertedindex; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; @@ -34,8 +36,6 @@ import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.metadata.model.SegmentStatusEnum; -import java.io.IOException; - /** * @author yangli9 */ http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java index c223159..e7e760a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java @@ -18,7 +18,11 @@ package org.apache.kylin.engine.mr.invertedindex; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; @@ -40,10 +44,7 @@ import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.invertedindex.model.IIKeyValueCodec; import org.apache.kylin.invertedindex.model.IIRow; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; +import com.google.common.collect.Lists; /** */ http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java index 2652f07..a509c1f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java @@ -18,11 +18,9 @@ package org.apache.kylin.engine.mr.invertedindex; +import java.io.IOException; + import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; @@ -34,14 +32,11 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** */ public class UpdateIIInfoAfterBuildStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(UpdateIIInfoAfterBuildStep.class); - private static final String II_NAME = "iiName"; private static final String JOB_ID = "jobId"; @@ -72,7 +67,7 @@ public class UpdateIIInfoAfterBuildStep extends AbstractExecutable { IIInstance ii = mgr.getII(getInvertedIndexName()); IISegment segment = ii.getFirstSegment(); segment.setStatus(SegmentStatusEnum.READY); - + segment.setLastBuildJobID(getJobId()); segment.setLastBuildTime(System.currentTimeMillis()); @@ -84,5 +79,5 @@ public class UpdateIIInfoAfterBuildStep extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } } - + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java index 8ff784d..62fd2a8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidJob.java @@ -26,11 +26,11 @@ import org.apache.hadoop.util.ToolRunner; */ public class BaseCuboidJob extends CuboidJob { - + public BaseCuboidJob() { this.setMapperClass(HiveToBaseCuboidMapper.class); } - + public static void main(String[] args) throws Exception { CuboidJob job = new BaseCuboidJob(); int exitCode = ToolRunner.run(job, args); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 7a106c1..10fbba3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -116,10 +116,10 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); dictionaryMap = cubeSegment.buildDictionaryMap(); - + initNullBytes(); } - + private void initNullBytes() { nullBytes = Lists.newArrayList(); nullBytes.add(HIVE_NULL); @@ -164,7 +164,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure); FunctionDesc function = measure.getFunction(); int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; - + int paramCount = function.getParameterCount(); String[] inputToMeasure = new String[paramCount]; @@ -182,7 +182,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL } inputToMeasure[i] = value; } - + return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index 85ae9c7..f037d2e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -58,9 +58,9 @@ public class CuboidJob extends AbstractHadoopJob { @SuppressWarnings("rawtypes") private Class<? extends Mapper> mapperClass; - + private boolean skipped = false; - + @Override public boolean isSkipped() { return skipped; @@ -69,7 +69,7 @@ public class CuboidJob extends AbstractHadoopJob { private boolean checkSkip(String cubingJobId) { if (cubingJobId == null) return false; - + ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId); skipped = cubingJob.isLayerCubing() == false; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java index 98ab199..1821828 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java @@ -18,11 +18,11 @@ package org.apache.kylin.engine.mr.steps; +import java.io.IOException; + import org.apache.hadoop.io.Text; import org.apache.kylin.engine.mr.KylinReducer; -import java.io.IOException; - /** * @author yangli9 */ http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 90253ba..4225ca9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -18,6 +18,9 @@ package org.apache.kylin.engine.mr.steps; +import java.io.IOException; +import java.util.List; + import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; @@ -30,7 +33,6 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; @@ -40,9 +42,6 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; - /** */ public class FactDistinctColumnsJob extends AbstractHadoopJob { http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index c504f0d..35481fd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -18,6 +18,10 @@ package org.apache.kylin.engine.mr.steps; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; @@ -35,10 +39,6 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - /** */ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> { @@ -82,7 +82,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K } } - + protected void handleErrorRecord(String[] record, Exception ex) throws IOException { System.err.println("Insane record: " + Arrays.toString(record)); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 2649492..0c13df7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -20,7 +20,10 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -30,7 +33,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; @@ -40,12 +42,13 @@ import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CuboidStatsUtil; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** */ @@ -76,12 +79,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); cubeConfig = cube.getConfig(); cubeDesc = cube.getDescriptor(); - columnList = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc); + columnList = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc); boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED)); int numberOfTasks = context.getNumReduceTasks(); int taskId = context.getTaskAttemptID().getTaskID().getId(); - + if (collectStatistics && (taskId == numberOfTasks - 1)) { // hll isStatistics = true; @@ -96,7 +99,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri colValues = Lists.newArrayList(); } } - + @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { @@ -130,7 +133,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri } } - + private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException { final Configuration conf = context.getConfiguration(); final FileSystem fs = FileSystem.get(conf); @@ -147,7 +150,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri logger.info("create file " + outputFile); } - for (ByteArray value: values) { + for (ByteArray value : values) { out.write(value.array(), value.offset(), value.length()); out.write('\n'); } @@ -172,7 +175,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri grandTotal += hll.getCountEstimate(); } double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - + writeMapperAndCuboidStatistics(context); // for human check CuboidStatsUtil.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // cuboidHLLMap, samplingPercentage, mapperOverlapRatio); // for CreateHTableJob http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 3be5795..c525e90 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -24,13 +24,12 @@ import java.util.Collection; import java.util.List; import org.apache.hadoop.io.Text; -import org.apache.kylin.measure.BufferedMeasureEncoder; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureEncoder; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java index 96e8030..83926cc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java @@ -19,7 +19,6 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import java.io.UnsupportedEncodingException; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.MRUtil; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 4b2ff37..f5076e4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -20,10 +20,8 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -98,7 +96,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); logger.info("Starting: " + job.getJobName()); - + setJobClasspath(job, cube.getConfig()); // add metadata to distributed cache http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index 673cfc0..54b7b7f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -49,7 +49,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra private int counter; private Object[] input; private Object[] result; - + private Text outputKey; private Text outputValue; @@ -67,7 +67,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra aggs = new MeasureAggregators(measuresDescs); input = new Object[measuresDescs.size()]; result = new Object[measuresDescs.size()]; - + outputKey = new Text(); outputValue = new Text(); } @@ -91,7 +91,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(outputKey, outputValue); - + counter++; if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Handled " + counter + " records!"); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java index 765f080..8057602 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java @@ -18,6 +18,8 @@ package org.apache.kylin.engine.mr.steps; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapreduce.MapContext; @@ -25,8 +27,6 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.ByteArrayWritable; -import java.io.IOException; - /** */ public class MapContextGTRecordWriter extends KVGTRecordWriter { http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index f74df35..bacc77b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.collect.Maps; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.kylin.common.KylinConfig; @@ -56,11 +55,12 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * @author ysong1, honma */ -@SuppressWarnings({"rawtypes", "unchecked"}) +@SuppressWarnings({ "rawtypes", "unchecked" }) public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private KylinConfig config; @@ -119,7 +119,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { codec = new BufferedMeasureEncoder(measureDescs); measureObjs = new Object[measureDescs.size()]; outputValue = new Text(); - + dictMeasures = Lists.newArrayList(); oldDicts = Maps.newHashMap(); newDicts = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java index e7aa698..7280d39 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java @@ -38,16 +38,15 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + public class MergeDictionaryStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryStep.class); - public MergeDictionaryStep() { super(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java index 486bd9c..c774cd6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java @@ -48,15 +48,14 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; - -import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; + public class MergeStatisticsStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(MergeStatisticsStep.class); - protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap(); public MergeStatisticsStep() { @@ -97,7 +96,7 @@ public class MergeStatisticsStep extends AbstractExecutable { LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf); while (reader.next(key, value)) { - if (key.get() == 0l) { + if (key.get() == 0L) { // sampling percentage; averageSamplingPercentage += Bytes.toInt(value.getBytes()); } else if (key.get() > 0) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java index 7967055..f300de9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java @@ -18,7 +18,10 @@ package org.apache.kylin.engine.mr.steps; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -28,7 +31,6 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.dao.ExecutableDao; import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.dao.ExecutablePO; @@ -52,8 +54,8 @@ public class MetadataCleanupJob extends AbstractHadoopJob { private KylinConfig config = null; - public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days - public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000l; // 30 days + public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000L; // 2 days + public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000L; // 30 days /* * (non-Javadoc) @@ -144,7 +146,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob { } } } - + // delete old and completed jobs ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv()); List<ExecutablePO> allExecutable = executableDao.getJobs(); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index ff9be44..d822134 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -98,9 +98,9 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId for (int i = 0; i < parentCuboidIdActualLength; i++) { if ((mask & parentCuboidId) > 0) {// if the this bit position equals - // 1 + // 1 if ((mask & childCuboidId) > 0) {// if the child cuboid has this - // column + // column System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length); offset += splitBuffers[index].length; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 7cc9dc3..d1772fd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -77,7 +77,6 @@ public class SaveStatisticsStep extends AbstractExecutable { throw new IllegalStateException(); } - final CubeSegment newSegment = cube.getSegmentById(segmentId); if (newSegment == null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 17ce5d0..d6435b7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory; public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterBuildStep.class); - public UpdateCubeInfoAfterBuildStep() { super(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index 5846caa..6e8e5ed 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -19,11 +19,8 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -33,8 +30,6 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; - -import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +37,6 @@ import org.slf4j.LoggerFactory; */ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { - private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterMergeStep.class); private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java index b64afd3..3ca09cf 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java @@ -18,7 +18,8 @@ package org.apache.kylin.engine.mr.steps; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; import java.lang.reflect.Field; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java index b9138fb..89d23fa 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java @@ -22,9 +22,9 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.RandomStringUtils; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java index ea4e38e..cbbaf38 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java @@ -26,9 +26,9 @@ import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.CuboidStatsUtil; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.junit.Test; import com.google.common.collect.Maps; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java index df4395f..01d47b8 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java @@ -18,7 +18,7 @@ package org.apache.kylin.engine.mr.steps; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java index dd8c721..caf87e2 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java @@ -74,7 +74,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0,-104,-106,-128, 11, 54, -105, 55,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9 , 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; + byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, -104, -106, -128, 11, 54, -105, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 }; Pair<Text, Text> input1 = new Pair<Text, Text>(new Text(key), new Text(value)); @@ -84,7 +84,7 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase { assertEquals(4, result.size()); - byte[] resultKey = { 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 0,-104,-106,-128, 55,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; + byte[] resultKey = { 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 0, -104, -106, -128, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 }; byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 }; Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), new Text(resultValue)); http://git-wip-us.apache.org/repos/asf/kylin/blob/5d4982e2/engine-spark/.settings/org.eclipse.core.resources.prefs ---------------------------------------------------------------------- diff --git a/engine-spark/.settings/org.eclipse.core.resources.prefs b/engine-spark/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..365bbd6 --- /dev/null +++ b/engine-spark/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,5 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 +encoding//src/test/java=UTF-8 +encoding/<project>=UTF-8