http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java deleted file mode 100644 index 56c9659..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - */ -public class FactDistinctColumnsJob extends AbstractHadoopJob { - protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class); - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_TABLE_NAME); - parseOptions(options, args); - - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - Configuration jobConf = job.getConfiguration(); - - String cubeName = getOptionValue(OPTION_CUBE_NAME); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - String intermediateTable = getOptionValue(OPTION_TABLE_NAME); - - // ---------------------------------------------------------------------------- - // add metadata to distributed cache - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - CubeInstance cubeInstance = cubeMgr.getCube(cubeName); - - jobConf.set(BatchConstants.CFG_CUBE_NAME, cubeName); - System.out.println("Starting: " + job.getJobName()); - - setJobClasspath(job); - - setupMapper(intermediateTable); - setupReducer(output); - - // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment(); - attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration()); - - return waitForCompletion(job); - - } catch (Exception e) { - logger.error("error in FactDistinctColumnsJob", e); - printUsage(options); - throw e; - } finally { - if (job != null) { - cleanupTempConfFile(job.getConfiguration()); - } - } - - } - - private void setupMapper(String intermediateTable) throws IOException { - // FileInputFormat.setInputPaths(job, input); - - String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable); - HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); - - job.setInputFormatClass(HCatInputFormat.class); - job.setMapperClass(FactDistinctColumnsMapper.class); - job.setCombinerClass(FactDistinctColumnsCombiner.class); - job.setMapOutputKeyClass(ShortWritable.class); - job.setMapOutputValueClass(Text.class); - } - - private void setupReducer(Path output) throws IOException { - job.setReducerClass(FactDistinctColumnsReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(Text.class); - - FileOutputFormat.setOutputPath(job, output); - job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); - - job.setNumReduceTasks(1); - - deletePath(job.getConfiguration(), output); - } - - public static void main(String[] args) throws Exception { - FactDistinctColumnsJob job = new FactDistinctColumnsJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java deleted file mode 100644 index 72802aa..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hive.hcatalog.data.schema.HCatSchema; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.RowKeyDesc; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.TblColRef; - -/** - * @author yangli9 - */ -public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ShortWritable, Text> { - - private String cubeName; - private CubeInstance cube; - private CubeDesc cubeDesc; - private int[] factDictCols; - - private CubeJoinedFlatTableDesc intermediateTableDesc; - - private ShortWritable outputKey = new ShortWritable(); - private Text outputValue = new Text(); - private int errorRecordCounter; - - private HCatSchema schema = null; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - Configuration conf = context.getConfiguration(); - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf); - cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); - cube = CubeManager.getInstance(config).getCube(cubeName); - cubeDesc = cube.getDescriptor(); - intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null); - - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - List<TblColRef> columns = baseCuboid.getColumns(); - - ArrayList<Integer> factDictCols = new ArrayList<Integer>(); - RowKeyDesc rowkey = cubeDesc.getRowkey(); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - for (int i = 0; i < columns.size(); i++) { - TblColRef col = columns.get(i); - if (rowkey.isUseDictionary(col) == false) - continue; - - String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0]; - if (cubeDesc.getModel().isFactTable(scanTable)) { - factDictCols.add(i); - } - } - this.factDictCols = new int[factDictCols.size()]; - for (int i = 0; i < factDictCols.size(); i++) - this.factDictCols[i] = factDictCols.get(i); - - schema = HCatInputFormat.getTableSchema(context.getConfiguration()); - } - - @Override - public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { - - try { - - int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); - HCatFieldSchema fieldSchema = null; - for (int i : factDictCols) { - outputKey.set((short) i); - fieldSchema = schema.get(flatTableIndexes[i]); - Object fieldValue = record.get(fieldSchema.getName(), schema); - if (fieldValue == null) - continue; - byte[] bytes = Bytes.toBytes(fieldValue.toString()); - outputValue.set(bytes, 0, bytes.length); - context.write(outputKey, outputValue); - } - } catch (Exception ex) { - handleErrorRecord(record, ex); - } - - } - - private void handleErrorRecord(HCatRecord record, Exception ex) throws IOException { - - System.err.println("Insane record: " + record.getAll()); - ex.printStackTrace(System.err); - - errorRecordCounter++; - if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) { - if (ex instanceof IOException) - throw (IOException) ex; - else if (ex instanceof RuntimeException) - throw (RuntimeException) ex; - else - throw new RuntimeException("", ex); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java deleted file mode 100644 index 89f90ba..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinReducer; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.model.TblColRef; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; - -/** - * @author yangli9 - */ -public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> { - - private List<TblColRef> columnList = new ArrayList<TblColRef>(); - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - Configuration conf = context.getConfiguration(); - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf); - String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); - CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); - CubeDesc cubeDesc = cube.getDescriptor(); - - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - columnList = baseCuboid.getColumns(); - } - - @Override - public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - TblColRef col = columnList.get(key.get()); - - HashSet<ByteArray> set = new HashSet<ByteArray>(); - for (Text textValue : values) { - ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); - set.add(value); - } - - Configuration conf = context.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - String outputPath = conf.get(BatchConstants.OUTPUT_PATH); - FSDataOutputStream out = fs.create(new Path(outputPath, col.getName())); - - try { - for (ByteArray value : set) { - out.write(value.data); - out.write('\n'); - } - } finally { - out.close(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java deleted file mode 100644 index befa16f..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -///* -// * Copyright 2013-2014 eBay Software Foundation -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.kylin.index.cube; -// -//import org.apache.commons.cli.Options; -//import org.apache.hadoop.fs.FileSystem; -//import org.apache.hadoop.fs.Path; -//import org.apache.hadoop.io.LongWritable; -//import org.apache.hadoop.io.Text; -//import org.apache.hadoop.mapreduce.Job; -//import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -//import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -//import org.apache.hadoop.util.ToolRunner; -// -//import org.apache.kylin.cube.CubeInstance; -//import org.apache.kylin.cube.CubeManager; -//import org.apache.kylin.cube.cuboid.Cuboid; -//import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; -//import org.apache.kylin.cube.kv.RowKeyEncoder; -//import org.apache.kylin.index.AbstractHadoopJob; -//import org.apache.kylin.metadata.model.cube.CubeDesc; -// -///** -// * @author xjiang -// * -// */ -// -//public class KeyDistributionJob extends AbstractHadoopJob { -// -// public static final String JOB_TITLE = "Kylin Row Key Distribution Job"; -// public static final String KEY_HEADER_LENGTH = "key_header_length"; -// public static final String KEY_COLUMN_PERCENTAGE = "key_column_percentage"; -// public static final String KEY_SPLIT_NUMBER = "key_split_number"; -// -// /* (non-Javadoc) -// * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) -// */ -// @Override -// public int run(String[] args) throws Exception { -// Options options = new Options(); -// -// try { -// options.addOption(OPTION_INPUT_PATH); -// options.addOption(OPTION_OUTPUT_PATH); -// options.addOption(OPTION_METADATA_URL); -// options.addOption(OPTION_CUBE_NAME); -// options.addOption(OPTION_KEY_COLUMN_PERCENTAGE); -// options.addOption(OPTION_KEY_SPLIT_NUMBER); -// parseOptions(options, args); -// -// // start job -// String jobName = JOB_TITLE + getOptionsAsString(); -// System.out.println("Starting: " + jobName); -// Job job = Job.getInstanceFromEnv(getConf(), jobName); -// -// // set job configuration - basic -// setJobClasspath(job); -// addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); -// -// Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); -// FileOutputFormat.setOutputPath(job, output); -// //job.getConfiguration().set("dfs.block.size", "67108864"); -// -// // set job configuration - key prefix size & key split number -// String keyColumnPercentage = getOptionValue(OPTION_KEY_COLUMN_PERCENTAGE); -// job.getConfiguration().set(KEY_COLUMN_PERCENTAGE, keyColumnPercentage); -// String metadataUrl = validateMetadataUrl(getOptionValue(OPTION_METADATA_URL)); -// String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); -// int keyHeaderLen = getKeyHeaderLength(metadataUrl, cubeName); -// job.getConfiguration().set(KEY_HEADER_LENGTH, String.valueOf(keyHeaderLen)); -// job.getConfiguration().set(KEY_SPLIT_NUMBER, getOptionValue(OPTION_KEY_SPLIT_NUMBER)); -// -// // Mapper -// job.setInputFormatClass(SequenceFileInputFormat.class); -// job.setMapperClass(KeyDistributionMapper.class); -// job.setMapOutputKeyClass(Text.class); -// job.setMapOutputValueClass(LongWritable.class); -// -// // Combiner, not needed any more as mapper now does the groping -// //job.setCombinerClass(KeyDistributionCombiner.class); -// -// // Reducer - only one -// job.setReducerClass(KeyDistributionReducer.class); -// // use sequence file as output -// job.setOutputFormatClass(SequenceFileOutputFormat.class); -// // key is text -// job.setOutputKeyClass(Text.class); -// // value is long -// job.setOutputValueClass(LongWritable.class); -// job.setNumReduceTasks(1); -// -// FileSystem fs = FileSystem.get(job.getConfiguration()); -// if (fs.exists(output)) -// fs.delete(output, true); -// -// return waitForCompletion(job); -// } catch (Exception e) { -// printUsage(options); -// e.printStackTrace(System.err); -// return 2; -// } -// } -// -// private int getKeyHeaderLength(String metadataUrl, String cubeName) { -// CubeManager cubeMgr = CubeManager.getInstanceFromEnv(metadataUrl); -// CubeInstance cubeInstance = cubeMgr.getCube(cubeName); -// CubeDesc cubeDesc = cubeInstance.getDescriptor(); -// long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); -// Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); -// RowKeyEncoder rowKeyEncoder = -// (RowKeyEncoder) AbstractRowKeyEncoder.createInstance(cubeInstance.getTheOnlySegment(), -// baseCuboid); -// -// return rowKeyEncoder.getHeaderLength(); -// -// } -// -// public static void main(String[] args) throws Exception { -// int exitCode = ToolRunner.run(new KeyDistributionJob(), args); -// System.exit(exitCode); -// } -// } http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java deleted file mode 100644 index f145bde..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -///* -// * Copyright 2013-2014 eBay Software Foundation -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.kylin.index.cube; -// -//import java.io.IOException; -// -//import org.apache.hadoop.io.LongWritable; -//import org.apache.hadoop.io.Text; -//import org.apache.hadoop.mapreduce.Mapper; -// -///** -// * @author xjiang -// * -// */ -//public class KeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> { -// -// private int headerLength; -// -// private Text currentKey; -// private long outputLong; -// private Text outputKey; -// private LongWritable outputValue; -// private int columnPercentage; -// private int allRowCount; -// -// @Override -// protected void setup(Context context) throws IOException { -//super.publishConfiguration(context.getConfiguration()); - -// String percentStr = context.getConfiguration().get(KeyDistributionJob.KEY_COLUMN_PERCENTAGE); -// this.columnPercentage = Integer.valueOf(percentStr).intValue(); -// if (this.columnPercentage <= 0 || this.columnPercentage >= 100) { -// this.columnPercentage = 20; -// } -// String headerLenStr = context.getConfiguration().get(KeyDistributionJob.KEY_HEADER_LENGTH); -// this.headerLength = Integer.valueOf(headerLenStr).intValue(); -// -// currentKey = new Text(); -// outputLong = 0; -// outputKey = new Text(); -// outputValue = new LongWritable(1); -// allRowCount = 0; -// } -// -// @Override -// protected void cleanup(Context context) throws IOException, InterruptedException { -// emit(context); // emit the last holding record -// -// byte[] zerokey = new byte[] { 0 }; -// outputKey.set(zerokey); -// outputValue.set(allRowCount); -// context.write(outputKey, outputValue); -// } -// -// @Override -// public void map(Text key, Text value, Context context) throws IOException, InterruptedException { -// byte[] bytes = key.getBytes(); -// int columnLength = bytes.length - this.headerLength; -// int columnPrefixLen = columnLength * this.columnPercentage / 100; -// if (columnPrefixLen == 0 && columnLength > 0) { -// columnPrefixLen = 1; -// } -// if (columnPrefixLen > 0) { -// currentKey.set(bytes, 0, this.headerLength + columnPrefixLen); -// } else { -// currentKey.set(bytes); -// } -// -// allRowCount++; -// -// if (outputKey.getLength() == 0) { // first record -// outputKey.set(currentKey); -// outputLong = 1; -// } else if (outputKey.equals(currentKey)) { // same key, note input is sorted -// outputLong++; -// } else { // the next key -// emit(context); -// outputKey.set(currentKey); -// outputLong = 1; -// } -// } -// -// private void emit(Context context) throws IOException, InterruptedException { -// if (outputLong == 0) -// return; -// -// outputValue.set(outputLong); -// context.write(outputKey, outputValue); -// } -// } http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java deleted file mode 100644 index dd02910..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -///* -// * Copyright 2013-2014 eBay Software Foundation -// * -// * Licensed under the Apache License, Version 2.0 (the "License"); -// * you may not use this file except in compliance with the License. -// * You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.kylin.index.cube; -// -//import java.io.IOException; -// -//import org.apache.hadoop.io.LongWritable; -//import org.apache.hadoop.io.Text; -//import org.apache.hadoop.mapreduce.Reducer; -//import org.apache.hadoop.util.StringUtils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -///** -// * @author xjiang -// * -// */ -//public class KeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> { -// -// private static final Logger logger = LoggerFactory.getLogger(KeyDistributionReducer.class); -// -// private LongWritable outputValue; -// private boolean isTotalCount; -// private long totalCount; -// private int splitNumber; -// private long splitQuota; -// private long splitRemain; -// -// @Override -// protected void setup(Context context) throws IOException, InterruptedException { -// super.publishConfiguration(context.getConfiguration()); - -// String splitStr = context.getConfiguration().get(KeyDistributionJob.KEY_SPLIT_NUMBER); -// splitNumber = Integer.valueOf(splitStr).intValue(); -// outputValue = new LongWritable(); -// isTotalCount = true; -// totalCount = 0; -// splitQuota = 0; -// splitRemain = 0; -// } -// -// @Override -// protected void cleanup(Context context) throws IOException, InterruptedException { -// logger.info("---------------"); -// long splitCount = splitQuota - splitRemain; -// logger.info("Total Count = " + totalCount + ", Left Count = " + splitCount); -// } -// -// @Override -// public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, -// InterruptedException { -// -// // calculate split quota -// if (isTotalCount) { -// for (LongWritable count : values) { -// totalCount += count.get(); -// } -// splitQuota = totalCount / splitNumber; -// splitRemain = splitQuota; -// isTotalCount = false; -// return; -// } -// -// // output key when split quota is used up -// for (LongWritable count : values) { -// splitRemain -= count.get(); -// } -// if (splitRemain <= 0) { -// long splitCount = splitQuota - splitRemain; -// String hexKey = StringUtils.byteToHexString(key.getBytes()); -// logger.info(hexKey + "\t\t" + splitCount); -// -// outputValue.set(splitCount); -// context.write(key, outputValue); -// splitRemain = splitQuota; -// } -// -// } -// } http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java deleted file mode 100644 index 73faa6c..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.job.constant.BatchConstants; - -/** - * @author ysong1 - */ -public class MergeCuboidJob extends CuboidJob { - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_NAME); - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_OUTPUT_PATH); - parseOptions(options, args); - - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase(); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeManager cubeMgr = CubeManager.getInstance(config); - CubeInstance cube = cubeMgr.getCube(cubeName); - - // start job - String jobName = getOptionValue(OPTION_JOB_NAME); - System.out.println("Starting: " + jobName); - job = Job.getInstance(getConf(), jobName); - - setJobClasspath(job); - - // set inputs - addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - FileOutputFormat.setOutputPath(job, output); - - // Mapper - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapperClass(MergeCuboidMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - - // Reducer - only one - job.setReducerClass(CuboidReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - - // set job configuration - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - - // add metadata to distributed cache - attachKylinPropsAndMetadata(cube, job.getConfiguration()); - - setReduceTaskNum(job, config, cubeName, 0); - - this.deletePath(job.getConfiguration(), output); - - return waitForCompletion(job); - } catch (Exception e) { - logger.error("error in MergeCuboidJob", e); - printUsage(options); - throw e; - } finally { - if (job != null) - cleanupTempConfFile(job.getConfiguration()); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java deleted file mode 100644 index 2528e07..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import com.google.common.collect.Lists; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.SplittedBytes; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.common.RowKeySplitter; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.measure.MeasureCodec; -import org.apache.kylin.measure.MeasureIngester; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; - -/** - * @author ysong1, honma - */ -public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { - - private KylinConfig config; - private String cubeName; - private String segmentName; - private CubeManager cubeManager; - private CubeInstance cube; - private CubeDesc cubeDesc; - private CubeSegment mergedCubeSegment; - private CubeSegment sourceCubeSegment;// Must be unique during a mapper's - // life cycle - - - // for re-encode measures that use dictionary - private List<Pair<Integer, MeasureIngester>> dictMeasures; - private Map<TblColRef, Dictionary<String>> oldDicts; - private Map<TblColRef, Dictionary<String>> newDicts; - private List<MeasureDesc> measureDescs; - private MeasureCodec codec; - private Object[] measureObjs; - private ByteBuffer valueBuf; - private Text outputValue; - - private Text outputKey = new Text(); - - private byte[] newKeyBuf; - private RowKeySplitter rowKeySplitter; - - private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>(); - - private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); - - private Boolean checkNeedMerging(TblColRef col) throws IOException { - Boolean ret = dictsNeedMerging.get(col); - if (ret != null) - return ret; - else { - ret = cubeDesc.getRowkey().isUseDictionary(col); - if (ret) { - String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0]; - ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable); - } - dictsNeedMerging.put(col, ret); - return ret; - } - } - - private String extractJobIDFromPath(String path) { - Matcher matcher = JOB_NAME_PATTERN.matcher(path); - // check the first occurance - if (matcher.find()) { - return matcher.group(1); - } else { - throw new IllegalStateException("Can not extract job ID from file path : " + path); - } - } - - private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) { - for (CubeSegment segment : cubeInstance.getSegments()) { - String lastBuildJobID = segment.getLastBuildJobID(); - if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) { - return segment; - } - } - - throw new IllegalStateException("No merging segment's last build job ID equals " + jobID); - - } - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.publishConfiguration(context.getConfiguration()); - - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase(); - - config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration()); - - cubeManager = CubeManager.getInstance(config); - cube = cubeManager.getCube(cubeName); - cubeDesc = cube.getDescriptor(); - mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); - - // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length; - newKeyBuf = new byte[256];// size will auto-grow - - // decide which source segment - InputSplit inputSplit = context.getInputSplit(); - String filePath = ((FileSplit) inputSplit).getPath().toString(); - System.out.println("filePath:" + filePath); - String jobID = extractJobIDFromPath(filePath); - System.out.println("jobID:" + jobID); - sourceCubeSegment = findSegmentWithUuid(jobID, cube); - System.out.println(sourceCubeSegment); - - this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); - - - measureDescs = cubeDesc.getMeasures(); - codec = new MeasureCodec(measureDescs); - measureObjs = new Object[measureDescs.size()]; - valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - outputValue = new Text(); - - dictMeasures = Lists.newArrayList(); - for (int i = 0; i < measureDescs.size(); i++) { - MeasureDesc measureDesc = measureDescs.get(i); - MeasureType measureType = measureDesc.getFunction().getMeasureType(); - if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) { - dictMeasures.add(Pair.newPair(i, measureType.newIngester())); - } - } - if (dictMeasures.size() > 0) { - oldDicts = sourceCubeSegment.buildDictionaryMap(); - newDicts = mergedCubeSegment.buildDictionaryMap(); - } - } - - @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { - long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length); - Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID); - - SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers(); - int bufOffset = 0; - BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN); - bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN; - - for (int i = 0; i < cuboid.getColumns().size(); ++i) { - TblColRef col = cuboid.getColumns().get(i); - - if (this.checkNeedMerging(col)) { - // if dictionary on fact table column, needs rewrite - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col)); - Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col)); - - while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBuf; - newKeyBuf = new byte[2 * newKeyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length); - } - - int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length); - int idInMergedDict; - - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset); - if (size < 0) { - idInMergedDict = mergedDict.nullId(); - } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size); - } - - BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId()); - bufOffset += mergedDict.getSizeOfId(); - } else { - // keep as it is - while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBuf; - newKeyBuf = new byte[2 * newKeyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length); - } - - System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length); - bufOffset += splittedByteses[i + 1].length; - } - } - byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset); - outputKey.set(newKey, 0, newKey.length); - - - // re-encode measures if dictionary is used - if (dictMeasures.size() > 0) { - codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs); - for (Pair<Integer, MeasureIngester> pair : dictMeasures) { - int i = pair.getFirst(); - MeasureIngester ingester = pair.getSecond(); - measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts); - } - valueBuf.clear(); - codec.encode(measureObjs, valueBuf); - outputValue.set(valueBuf.array(), 0, valueBuf.position()); - value = outputValue; - } - - context.write(outputKey, value); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java deleted file mode 100644 index e46093c..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.job.constant.JobStatusEnum; -import org.apache.kylin.job.dao.ExecutableDao; -import org.apache.kylin.job.dao.ExecutableOutputPO; -import org.apache.kylin.job.dao.ExecutablePO; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -public class MetadataCleanupJob extends AbstractHadoopJob { - - @SuppressWarnings("static-access") - private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused metadata").create("delete"); - - protected static final Logger log = LoggerFactory.getLogger(MetadataCleanupJob.class); - - boolean delete = false; - - private KylinConfig config = null; - - public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days - public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000l; // 30 days - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) - */ - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - log.info("----- jobs args: " + Arrays.toString(args)); - try { - options.addOption(OPTION_DELETE); - parseOptions(options, args); - - log.info("options: '" + getOptionsAsString() + "'"); - log.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'"); - delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE)); - - config = KylinConfig.getInstanceFromEnv(); - - cleanup(); - - return 0; - } catch (Exception e) { - e.printStackTrace(System.err); - throw e; - } - } - - private ResourceStore getStore() { - return ResourceStore.getStore(config); - } - - private boolean isOlderThanThreshold(long resourceTime) { - long currentTime = System.currentTimeMillis(); - - if (currentTime - resourceTime > TIME_THREADSHOLD) - return true; - return false; - } - - public void cleanup() throws Exception { - CubeManager cubeManager = CubeManager.getInstance(config); - - Set<String> activeResourceList = Sets.newHashSet(); - for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) { - for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) { - activeResourceList.addAll(segment.getSnapshotPaths()); - activeResourceList.addAll(segment.getDictionaryPaths()); - } - } - - List<String> toDeleteResource = Lists.newArrayList(); - - // two level resources, snapshot tables and cube statistics - for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT }) { - ArrayList<String> snapshotTables = getStore().listResources(resourceRoot); - - if (snapshotTables != null) { - for (String snapshotTable : snapshotTables) { - ArrayList<String> snapshotNames = getStore().listResources(snapshotTable); - if (snapshotNames != null) - for (String snapshot : snapshotNames) { - if (!activeResourceList.contains(snapshot)) { - long ts = getStore().getResourceTimestamp(snapshot); - if (isOlderThanThreshold(ts)) - toDeleteResource.add(snapshot); - } - } - } - } - } - - // three level resources, only dictionaries - ArrayList<String> dictTables = getStore().listResources(ResourceStore.DICT_RESOURCE_ROOT); - - if (dictTables != null) { - for (String table : dictTables) { - ArrayList<String> tableColNames = getStore().listResources(table); - if (tableColNames != null) - for (String tableCol : tableColNames) { - ArrayList<String> dictionaries = getStore().listResources(tableCol); - if (dictionaries != null) - for (String dict : dictionaries) - if (!activeResourceList.contains(dict)) { - long ts = getStore().getResourceTimestamp(dict); - if (isOlderThanThreshold(ts)) - toDeleteResource.add(dict); - } - } - } - } - - // delete old and completed jobs - ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv()); - List<ExecutablePO> allExecutable = executableDao.getJobs(); - for (ExecutablePO executable : allExecutable) { - long lastModified = executable.getLastModified(); - ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid()); - if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (output.getStatus().equals(JobStatusEnum.FINISHED.toString()) || output.getStatus().equals(JobStatusEnum.DISCARDED.toString()))) { - toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + executable.getUuid()); - toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + executable.getUuid()); - - for (ExecutablePO task : executable.getTasks()) { - toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + task.getUuid()); - toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + task.getUuid()); - } - } - } - - if (toDeleteResource.size() > 0) { - logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n"); - - for (String s : toDeleteResource) { - logger.info(s); - if (delete == true) { - getStore().deleteResource(s); - } - } - } else { - logger.info("No resource to be cleaned up from metadata store;"); - } - - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new MetadataCleanupJob(), args); - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java deleted file mode 100644 index feb4dc4..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import org.apache.hadoop.util.ToolRunner; - -/** - * @author George Song (ysong1) - * - */ - -public class NDCuboidJob extends CuboidJob { - - public NDCuboidJob() { - this.setMapperClass(NDCuboidMapper.class); - } - - public static void main(String[] args) throws Exception { - CuboidJob job = new NDCuboidJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java deleted file mode 100644 index 8725ed8..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.common.util.SplittedBytes; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.common.RowKeySplitter; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author George Song (ysong1) - * - */ -public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { - - private static final Logger logger = LoggerFactory.getLogger(NDCuboidMapper.class); - - private Text outputKey = new Text(); - private String cubeName; - private String segmentName; - private CubeDesc cubeDesc; - private CuboidScheduler cuboidScheduler; - - private int handleCounter; - private int skipCounter; - - private byte[] keyBuf = new byte[4096]; - private RowKeySplitter rowKeySplitter; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase(); - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration()); - - CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); - CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); - cubeDesc = cube.getDescriptor(); - - // initialize CubiodScheduler - cuboidScheduler = new CuboidScheduler(cubeDesc); - - rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); - } - - private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) { - int offset = 0; - - // cuboid id - System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length); - offset += childCuboid.getBytes().length; - - // rowkey columns - long mask = Long.highestOneBit(parentCuboid.getId()); - long parentCuboidId = parentCuboid.getId(); - long childCuboidId = childCuboid.getId(); - long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId()); - int index = 1; // skip cuboidId - for (int i = 0; i < parentCuboidIdActualLength; i++) { - if ((mask & parentCuboidId) > 0) {// if the this bit position equals - // 1 - if ((mask & childCuboidId) > 0) {// if the child cuboid has this - // column - System.arraycopy(splitBuffers[index].value, 0, keyBuf, offset, splitBuffers[index].length); - offset += splitBuffers[index].length; - } - index++; - } - mask = mask >> 1; - } - - return offset; - } - - @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { - long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength()); - Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId); - - Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId); - - // if still empty or null - if (myChildren == null || myChildren.size() == 0) { - context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L); - skipCounter++; - if (skipCounter % BatchConstants.COUNTER_MAX == 0) { - logger.info("Skipped " + skipCounter + " records!"); - } - return; - } - - context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L); - - handleCounter++; - if (handleCounter % BatchConstants.COUNTER_MAX == 0) { - logger.info("Handled " + handleCounter + " records!"); - } - - for (Long child : myChildren) { - Cuboid childCuboid = Cuboid.findById(cubeDesc, child); - int keyLength = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); - outputKey.set(keyBuf, 0, keyLength); - context.write(outputKey, value); - } - - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java deleted file mode 100644 index 9c50122..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xjiang, ysong1 - * - */ - -public class RangeKeyDistributionJob extends AbstractHadoopJob { - protected static final Logger log = LoggerFactory.getLogger(RangeKeyDistributionJob.class); - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) - */ - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_CUBE_NAME); - - parseOptions(options, args); - - // start job - String jobName = getOptionValue(OPTION_JOB_NAME); - job = Job.getInstance(getConf(), jobName); - - setJobClasspath(job); - - addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - FileOutputFormat.setOutputPath(job, output); - // job.getConfiguration().set("dfs.block.size", "67108864"); - - // Mapper - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapperClass(RangeKeyDistributionMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(LongWritable.class); - - // Reducer - only one - job.setReducerClass(RangeKeyDistributionReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - job.setNumReduceTasks(1); - - this.deletePath(job.getConfiguration(), output); - - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - CubeInstance cube = cubeMgr.getCube(cubeName); - DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity(); - int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString()); - int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax(); - int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin(); - job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize)); - job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount)); - job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount)); - - return waitForCompletion(job); - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java deleted file mode 100644 index 33baf45..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.mr.KylinMapper; - -/** - * @author ysong1 - * - */ -public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> { - - private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L; - - private LongWritable outputValue = new LongWritable(0); - - private long bytesRead = 0; - - private Text lastKey; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - } - - @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { - lastKey = key; - - int bytesLength = key.getLength() + value.getLength(); - bytesRead += bytesLength; - - if (bytesRead >= ONE_MEGA_BYTES) { - outputValue.set(bytesRead); - context.write(key, outputValue); - - // reset bytesRead - bytesRead = 0; - } - - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - if (lastKey != null) { - outputValue.set(bytesRead); - context.write(lastKey, outputValue); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java deleted file mode 100644 index b3ab4db..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.util.StringUtils; -import org.apache.kylin.common.mr.KylinReducer; -import org.apache.kylin.cube.model.v1.CubeDesc.CubeCapacity; -import org.apache.kylin.job.constant.BatchConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ysong1 - * - */ -public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> { - - public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L; - - private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class); - - private LongWritable outputValue = new LongWritable(0); - - private int minRegionCount = 1; - private int maxRegionCount = 500; - private int cut = 10; - private long bytesRead = 0; - private List<Text> gbPoints = new ArrayList<Text>(); - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) { - cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE)); - } - - if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) { - minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN)); - } - - if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) { - maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX)); - } - - logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount); - } - - @Override - public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { - for (LongWritable v : values) { - bytesRead += v.get(); - } - - if (bytesRead >= ONE_GIGA_BYTES) { - gbPoints.add(new Text(key)); - bytesRead = 0; // reset bytesRead - } - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - int nRegion = Math.round((float) gbPoints.size() / (float) cut); - nRegion = Math.max(minRegionCount, nRegion); - nRegion = Math.min(maxRegionCount, nRegion); - - int gbPerRegion = gbPoints.size() / nRegion; - gbPerRegion = Math.max(1, gbPerRegion); - - System.out.println(nRegion + " regions"); - System.out.println(gbPerRegion + " GB per region"); - - for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) { - Text key = gbPoints.get(i); - outputValue.set(i); - System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get()); - context.write(key, outputValue); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java deleted file mode 100644 index 9372f18..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; - -/** - * @author ysong1 - * - */ -public class RowKeyDistributionCheckerJob extends AbstractHadoopJob { - - @SuppressWarnings("static-access") - protected static final Option rowKeyStatsFilePath = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath"); - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_JOB_NAME); - options.addOption(rowKeyStatsFilePath); - - parseOptions(options, args); - - String statsFilePath = getOptionValue(rowKeyStatsFilePath); - - // start job - String jobName = getOptionValue(OPTION_JOB_NAME); - job = Job.getInstance(getConf(), jobName); - - setJobClasspath(job); - - addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - FileOutputFormat.setOutputPath(job, output); - - // Mapper - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapperClass(RowKeyDistributionCheckerMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(LongWritable.class); - - // Reducer - only one - job.setReducerClass(RowKeyDistributionCheckerReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - job.setNumReduceTasks(1); - - job.getConfiguration().set("rowKeyStatsFilePath", statsFilePath); - - this.deletePath(job.getConfiguration(), output); - - return waitForCompletion(job); - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new RowKeyDistributionCheckerJob(), args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java deleted file mode 100644 index 50b0499..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.kylin.common.mr.KylinMapper; - -/** - * @author ysong1 - * - */ -public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Text, LongWritable> { - - String rowKeyStatsFilePath; - byte[][] splitKeys; - Map<Text, Long> resultMap; - List<Text> keyList; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - rowKeyStatsFilePath = context.getConfiguration().get("rowKeyStatsFilePath"); - splitKeys = this.getSplits(context.getConfiguration(), new Path(rowKeyStatsFilePath)); - - resultMap = new HashMap<Text, Long>(); - keyList = new ArrayList<Text>(); - for (int i = 0; i < splitKeys.length; i++) { - Text key = new Text(splitKeys[i]); - resultMap.put(key, 0L); - keyList.add(new Text(splitKeys[i])); - } - } - - @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { - for (Text t : keyList) { - if (key.compareTo(t) < 0) { - Long v = resultMap.get(t); - long length = key.getLength() + value.getLength(); - v += length; - resultMap.put(t, v); - break; - } - } - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - LongWritable outputValue = new LongWritable(); - for (Entry<Text, Long> kv : resultMap.entrySet()) { - outputValue.set(kv.getValue()); - context.write(kv.getKey(), outputValue); - } - } - - @SuppressWarnings("deprecation") - public byte[][] getSplits(Configuration conf, Path path) { - List<byte[]> rowkeyList = new ArrayList<byte[]>(); - SequenceFile.Reader reader = null; - try { - reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf); - Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); - Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); - while (reader.next(key, value)) { - byte[] tmp = ((Text) key).copyBytes(); - if (rowkeyList.contains(tmp) == false) { - rowkeyList.add(tmp); - } - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - IOUtils.closeStream(reader); - } - - byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]); - - return retValue; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java deleted file mode 100644 index 83e503e..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.mr.KylinReducer; - -/** - * @author ysong1 - * - */ -public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> { - - LongWritable outputKey = new LongWritable(0L); - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - } - - @Override - public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { - - long length = 0; - for (LongWritable v : values) { - length += v.get(); - } - - outputKey.set(length); - context.write(key, outputKey); - } -}
