http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java deleted file mode 100644 index b1a17e7..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - */ -public class InvertedIndexJob extends AbstractHadoopJob { - protected static final Logger log = LoggerFactory.getLogger(InvertedIndexJob.class); - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_II_NAME); - options.addOption(OPTION_TABLE_NAME); - options.addOption(OPTION_OUTPUT_PATH); - parseOptions(options, args); - - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - String iiname = getOptionValue(OPTION_II_NAME); - String intermediateTable = getOptionValue(OPTION_TABLE_NAME); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - - // ---------------------------------------------------------------------------- - - System.out.println("Starting: " + job.getJobName()); - - IIInstance ii = getII(iiname); - short sharding = ii.getDescriptor().getSharding(); - - setJobClasspath(job); - - setupMapper(intermediateTable); - setupReducer(output, sharding); - attachMetadata(ii); - - return waitForCompletion(job); - - } catch (Exception e) { - printUsage(options); - throw e; - } finally { - if (job != null) - cleanupTempConfFile(job.getConfiguration()); - } - } - - private IIInstance getII(String iiName) { - IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); - IIInstance ii = mgr.getII(iiName); - if (ii == null) - throw new IllegalArgumentException("No Inverted Index found by name " + iiName); - return ii; - } - - private void attachMetadata(IIInstance ii) throws IOException { - - Configuration conf = job.getConfiguration(); - attachKylinPropsAndMetadata(ii, conf); - - IISegment seg = ii.getFirstSegment(); - conf.set(BatchConstants.CFG_II_NAME, ii.getName()); - conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName()); - } - - private void setupMapper(String intermediateTable) throws IOException { - - String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable); - HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); - - job.setInputFormatClass(HCatInputFormat.class); - - job.setMapperClass(InvertedIndexMapper.class); - job.setMapOutputKeyClass(LongWritable.class); - job.setMapOutputValueClass(ImmutableBytesWritable.class); - job.setPartitionerClass(InvertedIndexPartitioner.class); - } - - private void setupReducer(Path output, short sharding) throws IOException { - job.setReducerClass(InvertedIndexReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(ImmutableBytesWritable.class); - - job.setNumReduceTasks(sharding); - - FileOutputFormat.setOutputPath(job, output); - - job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); - - deletePath(job.getConfiguration(), output); - } - - public static void main(String[] args) throws Exception { - InvertedIndexJob job = new InvertedIndexJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java deleted file mode 100644 index b3baafe..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hive.hcatalog.data.schema.HCatSchema; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.index.TableRecord; -import org.apache.kylin.invertedindex.index.TableRecordInfo; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.model.SegmentStatusEnum; - -/** - * @author yangli9 - */ -public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, LongWritable, ImmutableBytesWritable> { - - private TableRecordInfo info; - private TableRecord rec; - - private LongWritable outputKey; - private ImmutableBytesWritable outputValue; - private HCatSchema schema = null; - private List<HCatFieldSchema> fields; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - Configuration conf = context.getConfiguration(); - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf); - IIManager mgr = IIManager.getInstance(config); - IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME)); - IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW); - this.info = new TableRecordInfo(seg); - this.rec = this.info.createTableRecord(); - - outputKey = new LongWritable(); - outputValue = new ImmutableBytesWritable(rec.getBytes()); - - schema = HCatInputFormat.getTableSchema(context.getConfiguration()); - - fields = schema.getFields(); - } - - @Override - public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { - - rec.reset(); - for (int i = 0; i < fields.size(); i++) { - Object fieldValue = record.get(i); - rec.setValueString(i, fieldValue == null ? null : fieldValue.toString()); - } - - outputKey.set(rec.getTimestamp()); - // outputValue's backing bytes array is the same as rec - - context.write(outputKey, outputValue); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java deleted file mode 100644 index 113d4ed..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.index.TableRecord; -import org.apache.kylin.invertedindex.index.TableRecordInfo; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.model.SegmentStatusEnum; - -/** - * @author yangli9 - */ -public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable { - - private Configuration conf; - private TableRecordInfo info; - private TableRecord rec; - - @Override - public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) { - rec.setBytes(value.get(), value.getOffset(), value.getLength()); - return rec.getShard(); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - try { - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf); - IIManager mgr = IIManager.getInstance(config); - IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME)); - IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW); - this.info = new TableRecordInfo(seg); - this.rec = this.info.createTableRecord(); - } catch (IOException e) { - throw new RuntimeException("", e); - } - } - - @Override - public Configuration getConf() { - return conf; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java deleted file mode 100644 index 37b02b8..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinReducer; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.index.Slice; -import org.apache.kylin.invertedindex.index.SliceBuilder; -import org.apache.kylin.invertedindex.index.TableRecord; -import org.apache.kylin.invertedindex.index.TableRecordInfo; -import org.apache.kylin.invertedindex.model.IIKeyValueCodec; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.model.SegmentStatusEnum; - -/** - * @author yangli9 - */ -public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> { - - private TableRecordInfo info; - private TableRecord rec; - private SliceBuilder builder; - private IIKeyValueCodec kv; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - Configuration conf = context.getConfiguration(); - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf); - IIManager mgr = IIManager.getInstance(config); - IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME)); - IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW); - info = new TableRecordInfo(seg); - rec = info.createTableRecord(); - builder = null; - kv = new IIKeyValueCodec(info.getDigest()); - } - - @Override - public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) // - throws IOException, InterruptedException { - for (ImmutableBytesWritable v : values) { - rec.setBytes(v.get(), v.getOffset(), v.getLength()); - - if (builder == null) { - builder = new SliceBuilder(info, rec.getShard()); - } - - //TODO: to delete this log - System.out.println(rec.getShard() + " - " + rec); - - Slice slice = builder.append(rec); - if (slice != null) { - output(slice, context); - } - } - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - Slice slice = builder.close(); - if (slice != null) { - output(slice, context); - } - } - - private void output(Slice slice, Context context) throws IOException, InterruptedException { - for (Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair : kv.encodeKeyValue(slice)) { - context.write(pair.getFirst(), pair.getSecond()); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java deleted file mode 100644 index 8ac5650..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ysong1 - * - */ -@SuppressWarnings("static-access") -public class RandomKeyDistributionJob extends AbstractHadoopJob { - - protected static final Logger log = LoggerFactory.getLogger(RandomKeyDistributionJob.class); - - static final Option OPTION_KEY_CLASS = OptionBuilder.withArgName("keyclass").hasArg().isRequired(true).withDescription("Key Class").create("keyclass"); - static final Option OPTION_REGION_MB = OptionBuilder.withArgName("regionmb").hasArg().isRequired(true).withDescription("MB per Region").create("regionmb"); - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_KEY_CLASS); - options.addOption(OPTION_REGION_MB); - - parseOptions(options, args); - - // start job - String jobName = getOptionValue(OPTION_JOB_NAME); - job = Job.getInstance(getConf(), jobName); - - setJobClasspath(job); - - addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - FileOutputFormat.setOutputPath(job, output); - - String keyClass = getOptionValue(OPTION_KEY_CLASS); - Class<?> keyClz = Class.forName(keyClass); - - int regionMB = Integer.parseInt(getOptionValue(OPTION_REGION_MB)); - - // Mapper - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapperClass(RandomKeyDistributionMapper.class); - job.setMapOutputKeyClass(keyClz); - job.setMapOutputValueClass(NullWritable.class); - - // Reducer - only one - job.setReducerClass(RandomKeyDistributionReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(keyClz); - job.setOutputValueClass(NullWritable.class); - job.setNumReduceTasks(1); - - this.deletePath(job.getConfiguration(), output); - - // total map input MB - double totalMapInputMB = this.getTotalMapInputMB(); - int regionCount = Math.max(1, (int) (totalMapInputMB / regionMB)); - int mapSampleNumber = 1000; - System.out.println("Total Map Input MB: " + totalMapInputMB); - System.out.println("Region Count: " + regionCount); - - // set job configuration - job.getConfiguration().set(BatchConstants.MAPPER_SAMPLE_NUMBER, String.valueOf(mapSampleNumber)); - job.getConfiguration().set(BatchConstants.REGION_NUMBER, String.valueOf(regionCount)); - - return waitForCompletion(job); - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new RandomKeyDistributionJob(), args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java deleted file mode 100644 index 3914830..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.common.util.RandomSampler; -import org.apache.kylin.job.constant.BatchConstants; - -/** - * @author ysong1 - * - */ -public class RandomKeyDistributionMapper<KEY extends Writable, VALUE> extends KylinMapper<KEY, VALUE, KEY, NullWritable> { - - private Configuration conf; - private int sampleNumber; - private List<KEY> allKeys; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - conf = context.getConfiguration(); - allKeys = new ArrayList<KEY>(); - sampleNumber = Integer.parseInt(conf.get(BatchConstants.MAPPER_SAMPLE_NUMBER)); - } - - @SuppressWarnings("unchecked") - @Override - public void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException { - KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf); - ReflectionUtils.copy(conf, key, keyCopy); - allKeys.add(keyCopy); - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - RandomSampler<KEY> sampler = new RandomSampler<KEY>(); - List<KEY> sampleResult = sampler.sample(allKeys, sampleNumber); - for (KEY k : sampleResult) { - context.write(k, NullWritable.get()); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java deleted file mode 100644 index b96d18b..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.kylin.common.mr.KylinReducer; -import org.apache.kylin.job.constant.BatchConstants; - -/** - * @author ysong1 - * - */ -public class RandomKeyDistributionReducer<KEY extends Writable> extends KylinReducer<KEY, NullWritable, KEY, NullWritable> { - - private Configuration conf; - private int regionNumber; - private List<KEY> allSplits; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - conf = context.getConfiguration(); - allSplits = new ArrayList<KEY>(); - regionNumber = Integer.parseInt(context.getConfiguration().get(BatchConstants.REGION_NUMBER)); - } - - @SuppressWarnings("unchecked") - @Override - public void reduce(KEY key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { - KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf); - ReflectionUtils.copy(conf, key, keyCopy); - allSplits.add(keyCopy); - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - int stepLength = allSplits.size() / regionNumber; - for (int i = stepLength; i < allSplits.size(); i += stepLength) { - context.write(allSplits.get(i), NullWritable.get()); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java deleted file mode 100644 index 1cc20df..0000000 --- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.impl.threadpool; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.execution.Executable; -import org.apache.kylin.job.execution.ExecutableContext; - -/** - * Created by qianzhou on 12/16/14. - */ -public class DefaultContext implements ExecutableContext { - - private final ConcurrentMap<String, Executable> runningJobs; - private final KylinConfig kylinConfig; - - public DefaultContext(ConcurrentMap<String, Executable> runningJobs, KylinConfig kylinConfig) { - this.runningJobs = runningJobs; - this.kylinConfig = kylinConfig; - } - - @Override - public Object getSchedulerContext() { - return null; - } - - @Override - public KylinConfig getConfig() { - return kylinConfig; - } - - void addRunningJob(Executable executable) { - runningJobs.put(executable.getId(), executable); - } - - void removeRunningJob(Executable executable) { - runningJobs.remove(executable.getId()); - } - - public Map<String, Executable> getRunningJobs() { - return Collections.unmodifiableMap(runningJobs); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java deleted file mode 100644 index 46592f7..0000000 --- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.impl.threadpool; - -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.kylin.common.lock.JobLock; -import org.apache.kylin.job.Scheduler; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.exception.SchedulerException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.Executable; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.Output; -import org.apache.kylin.job.manager.ExecutableManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; - -/** - * Created by qianzhou on 12/15/14. - */ -public class DefaultScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener { - - private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock"; - - private ExecutableManager executableManager; - private FetcherRunner fetcher; - private ScheduledExecutorService fetcherPool; - private ExecutorService jobPool; - private DefaultContext context; - - private Logger logger = LoggerFactory.getLogger(DefaultScheduler.class); - private volatile boolean initialized = false; - private volatile boolean hasStarted = false; - private CuratorFramework zkClient; - private JobEngineConfig jobEngineConfig; - private InterProcessMutex sharedLock; - - private static final DefaultScheduler INSTANCE = new DefaultScheduler(); - - private DefaultScheduler() { - } - - private class FetcherRunner implements Runnable { - - @Override - synchronized public void run() { - try { - // logger.debug("Job Fetcher is running..."); - Map<String, Executable> runningJobs = context.getRunningJobs(); - if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time"); - return; - } - - int nRunning = 0, nReady = 0, nOthers = 0; - for (final String id : executableManager.getAllJobIds()) { - if (runningJobs.containsKey(id)) { - // logger.debug("Job id:" + id + " is already running"); - nRunning++; - continue; - } - final Output output = executableManager.getOutput(id); - if ((output.getState() != ExecutableState.READY)) { - // logger.debug("Job id:" + id + " not runnable"); - nOthers++; - continue; - } - nReady++; - AbstractExecutable executable = executableManager.getJob(id); - String jobDesc = executable.toString(); - logger.info(jobDesc + " prepare to schedule"); - try { - context.addRunningJob(executable); - jobPool.execute(new JobRunner(executable)); - logger.info(jobDesc + " scheduled"); - } catch (Exception ex) { - context.removeRunningJob(executable); - logger.warn(jobDesc + " fail to schedule", ex); - } - } - logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nOthers + " others"); - } catch (Exception e) { - logger.warn("Job Fetcher caught a exception " + e); - } - } - } - - private class JobRunner implements Runnable { - - private final AbstractExecutable executable; - - public JobRunner(AbstractExecutable executable) { - this.executable = executable; - } - - @Override - public void run() { - try { - executable.execute(context); - // trigger the next step asap - fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS); - } catch (ExecuteException e) { - logger.error("ExecuteException job:" + executable.getId(), e); - } catch (Exception e) { - logger.error("unknown error execute job:" + executable.getId(), e); - } finally { - context.removeRunningJob(executable); - } - } - } - - public static DefaultScheduler getInstance() { - return INSTANCE; - } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) { - try { - shutdown(); - } catch (SchedulerException e) { - throw new RuntimeException("failed to shutdown scheduler", e); - } - } - } - - @Override - public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException { - if (!initialized) { - initialized = true; - } else { - return; - } - - this.jobEngineConfig = jobEngineConfig; - jobLock.lock(); - - executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig()); - //load all executable, set them to a consistent status - fetcherPool = Executors.newScheduledThreadPool(1); - int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit(); - jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>()); - context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig()); - - for (AbstractExecutable executable : executableManager.getAllExecutables()) { - if (executable.getStatus() == ExecutableState.READY) { - executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status"); - } - } - executableManager.updateAllRunningJobsToError(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - logger.debug("Closing zk connection"); - try { - shutdown(); - jobLock.unlock(); - } catch (SchedulerException e) { - logger.error("error shutdown scheduler", e); - } - } - }); - - fetcher = new FetcherRunner(); - fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); - hasStarted = true; - } - - @Override - public void shutdown() throws SchedulerException { - fetcherPool.shutdown(); - jobPool.shutdown(); - } - - @Override - public boolean stop(AbstractExecutable executable) throws SchedulerException { - if (hasStarted) { - return true; - } else { - //TODO should try to stop this executable - return true; - } - } - - public boolean hasStarted() { - return this.hasStarted; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java deleted file mode 100644 index 1aa72f8..0000000 --- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.invertedindex; - -import org.apache.kylin.job.execution.DefaultChainedExecutable; - -/** - * Created by shaoshi on 1/15/15. - */ -public class IIJob extends DefaultChainedExecutable { - - public IIJob() { - super(); - } - - private static final String II_INSTANCE_NAME = "iiName"; - private static final String SEGMENT_ID = "segmentId"; - - void setIIName(String name) { - setParam(II_INSTANCE_NAME, name); - } - - public String getIIName() { - return getParam(II_INSTANCE_NAME); - } - - void setSegmentId(String segmentId) { - setParam(SEGMENT_ID, segmentId); - } - - public String getSegmentId() { - return getParam(SEGMENT_ID); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java deleted file mode 100644 index 68ad36b..0000000 --- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.invertedindex; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.TimeZone; - -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.job.AbstractJobBuilder; -import org.apache.kylin.job.common.HadoopShellExecutable; -import org.apache.kylin.job.common.MapReduceExecutable; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.hadoop.dict.CreateInvertedIndexDictionaryJob; -import org.apache.kylin.job.hadoop.hive.IIJoinedFlatTableDesc; -import org.apache.kylin.job.hadoop.invertedindex.IIBulkLoadJob; -import org.apache.kylin.job.hadoop.invertedindex.IICreateHFileJob; -import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob; -import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob; -import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob; -import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity; - -import com.google.common.base.Preconditions; - -/** - * Created by shaoshi on 1/15/15. - */ -public final class IIJobBuilder extends AbstractJobBuilder { - - public IIJobBuilder(JobEngineConfig engineConfig) { - super(engineConfig); - } - - public IIJob buildJob(IISegment seg) { - checkPreconditions(seg); - - IIJob result = initialJob(seg, "BUILD"); - final String jobId = result.getId(); - final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc()); - final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId); - final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId); - final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/"; - final String iiPath = iiRootPath + "*"; - - final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId); - result.addTask(intermediateHiveTableStep); - - result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId, factDistinctColumnsPath)); - - result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath)); - - result.addTask(createInvertedIndexStep(seg, intermediateHiveTableName, iiRootPath)); - - // create htable step - result.addTask(createCreateHTableStep(seg)); - - // generate hfiles step - result.addTask(createConvertToHfileStep(seg, iiPath, jobId)); - - // bulk load step - result.addTask(createBulkLoadStep(seg, jobId)); - - return result; - } - - private IIJob initialJob(IISegment seg, String type) { - IIJob result = new IIJob(); - SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); - format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone())); - result.setIIName(seg.getIIInstance().getName()); - result.setSegmentId(seg.getUuid()); - result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis()))); - result.setSubmitter(this.submitter); - return result; - } - - private void checkPreconditions(IISegment seg) { - Preconditions.checkNotNull(seg, "segment cannot be null"); - Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null"); - } - - private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) { - try { - String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM); - if (jobConf != null && jobConf.length() > 0) { - builder.append(" -conf ").append(jobConf); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) { - return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns"; - } - - private String getHFilePath(IISegment seg, String jobId) { - return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/"; - } - - private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) { - MapReduceExecutable result = new MapReduceExecutable(); - result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); - result.setMapReduceJobClass(IIDistinctColumnsJob.class); - StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, engineConfig); - appendExecCmdParameters(cmd, "tablename", factTableName); - appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); - appendExecCmdParameters(cmd, "output", output); - appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step"); - - result.setMapReduceParams(cmd.toString()); - return result; - } - - private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) { - // base cuboid job - HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); - buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); - appendExecCmdParameters(cmd, "input", factDistinctColumnsPath); - - buildDictionaryStep.setJobParams(cmd.toString()); - buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class); - return buildDictionaryStep; - } - - private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) { - // base cuboid job - MapReduceExecutable buildIIStep = new MapReduceExecutable(); - - StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, engineConfig); - - buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II); - - appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); - appendExecCmdParameters(cmd, "tablename", intermediateHiveTable); - appendExecCmdParameters(cmd, "output", iiOutputTempPath); - appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II); - - buildIIStep.setMapReduceParams(cmd.toString()); - buildIIStep.setMapReduceJobClass(InvertedIndexJob.class); - return buildIIStep; - } - - private HadoopShellExecutable createCreateHTableStep(IISegment seg) { - HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); - createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - - createHtableStep.setJobParams(cmd.toString()); - createHtableStep.setJobClass(IICreateHTableJob.class); - - return createHtableStep; - } - - private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) { - MapReduceExecutable createHFilesStep = new MapReduceExecutable(); - createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE); - StringBuilder cmd = new StringBuilder(); - - appendMapReduceParameters(cmd, engineConfig); - appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId)); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step"); - - createHFilesStep.setMapReduceParams(cmd.toString()); - createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class); - - return createHFilesStep; - } - - private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) { - HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable(); - bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE); - - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId)); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); - - bulkLoadStep.setJobParams(cmd.toString()); - bulkLoadStep.setJobClass(IIBulkLoadJob.class); - - return bulkLoadStep; - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java deleted file mode 100644 index 3c79f8c..0000000 --- a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.manager; - -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.job.dao.ExecutableDao; -import org.apache.kylin.job.dao.ExecutableOutputPO; -import org.apache.kylin.job.dao.ExecutablePO; -import org.apache.kylin.job.exception.IllegalStateTranferException; -import org.apache.kylin.job.exception.PersistentException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ChainedExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.DefaultOutput; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.Output; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - */ -public class ExecutableManager { - - private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class); - private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>(); - @SuppressWarnings("unused") - private final KylinConfig config; - - private ExecutableDao executableDao; - - public static ExecutableManager getInstance(KylinConfig config) { - ExecutableManager r = CACHE.get(config); - if (r == null) { - r = new ExecutableManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - - } - return r; - } - - private ExecutableManager(KylinConfig config) { - logger.info("Using metadata url: " + config); - this.config = config; - this.executableDao = ExecutableDao.getInstance(config); - } - - public void addJob(AbstractExecutable executable) { - try { - executableDao.addJob(parse(executable)); - addJobOutput(executable); - } catch (PersistentException e) { - logger.error("fail to submit job:" + executable.getId(), e); - throw new RuntimeException(e); - } - } - - private void addJobOutput(AbstractExecutable executable) throws PersistentException { - ExecutableOutputPO executableOutputPO = new ExecutableOutputPO(); - executableOutputPO.setUuid(executable.getId()); - executableDao.addJobOutput(executableOutputPO); - if (executable instanceof DefaultChainedExecutable) { - for (AbstractExecutable subTask : ((DefaultChainedExecutable) executable).getTasks()) { - addJobOutput(subTask); - } - } - } - - //for ut - public void deleteJob(String jobId) { - try { - executableDao.deleteJob(jobId); - } catch (PersistentException e) { - logger.error("fail to delete job:" + jobId, e); - throw new RuntimeException(e); - } - } - - public AbstractExecutable getJob(String uuid) { - try { - return parseTo(executableDao.getJob(uuid)); - } catch (PersistentException e) { - logger.error("fail to get job:" + uuid, e); - throw new RuntimeException(e); - } - } - - public Output getOutput(String uuid) { - try { - final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid); - Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid); - return parseOutput(jobOutput); - } catch (PersistentException e) { - logger.error("fail to get job output:" + uuid, e); - throw new RuntimeException(e); - } - } - - private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) { - final DefaultOutput result = new DefaultOutput(); - result.setExtra(jobOutput.getInfo()); - result.setState(ExecutableState.valueOf(jobOutput.getStatus())); - result.setVerboseMsg(jobOutput.getContent()); - result.setLastModified(jobOutput.getLastModified()); - return result; - } - - public Map<String, Output> getAllOutputs() { - try { - final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); - HashMap<String, Output> result = Maps.newHashMap(); - for (ExecutableOutputPO jobOutput : jobOutputs) { - result.put(jobOutput.getId(), parseOutput(jobOutput)); - } - return result; - } catch (PersistentException e) { - logger.error("fail to get all job output:", e); - throw new RuntimeException(e); - } - } - - public List<AbstractExecutable> getAllExecutables() { - try { - List<AbstractExecutable> ret = Lists.newArrayList(); - for (ExecutablePO po : executableDao.getJobs()) { - try { - AbstractExecutable ae = parseTo(po); - ret.add(ae); - } catch (IllegalArgumentException e) { - logger.error("error parsing one executabePO: ", e); - } - } - return ret; - } catch (PersistentException e) { - logger.error("error get All Jobs", e); - throw new RuntimeException(e); - } - } - - public List<String> getAllJobIds() { - try { - return executableDao.getJobIds(); - } catch (PersistentException e) { - logger.error("error get All Job Ids", e); - throw new RuntimeException(e); - } - } - - public void updateAllRunningJobsToError() { - try { - final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); - for (ExecutableOutputPO executableOutputPO : jobOutputs) { - if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) { - executableOutputPO.setStatus(ExecutableState.ERROR.toString()); - executableDao.updateJobOutput(executableOutputPO); - } - } - } catch (PersistentException e) { - logger.error("error reset job status from RUNNING to ERROR", e); - throw new RuntimeException(e); - } - } - - public void resumeJob(String jobId) { - AbstractExecutable job = getJob(jobId); - if (job == null) { - return; - } - updateJobOutput(jobId, ExecutableState.READY, null, null); - if (job instanceof DefaultChainedExecutable) { - List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); - for (AbstractExecutable task : tasks) { - if (task.getStatus() == ExecutableState.ERROR) { - updateJobOutput(task.getId(), ExecutableState.READY, null, null); - break; - } - } - } - } - - public void discardJob(String jobId) { - AbstractExecutable job = getJob(jobId); - if (job instanceof DefaultChainedExecutable) { - List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); - for (AbstractExecutable task : tasks) { - if (!task.getStatus().isFinalState()) { - updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null); - } - } - } - updateJobOutput(jobId, ExecutableState.DISCARDED, null, null); - } - - public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) { - try { - final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); - Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId); - ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus()); - if (newStatus != null && oldStatus != newStatus) { - if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) { - throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus); - } - jobOutput.setStatus(newStatus.toString()); - } - if (info != null) { - jobOutput.setInfo(info); - } - if (output != null) { - jobOutput.setContent(output); - } - executableDao.updateJobOutput(jobOutput); - logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus); - } catch (PersistentException e) { - logger.error("error change job:" + jobId + " to " + newStatus.toString()); - throw new RuntimeException(e); - } - } - - //for migration only - //TODO delete when migration finished - public void resetJobOutput(String jobId, ExecutableState state, String output) { - try { - final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); - jobOutput.setStatus(state.toString()); - if (output != null) { - jobOutput.setContent(output); - } - executableDao.updateJobOutput(jobOutput); - } catch (PersistentException e) { - throw new RuntimeException(e); - } - } - - public void addJobInfo(String id, Map<String, String> info) { - if (info == null) { - return; - } - try { - ExecutableOutputPO output = executableDao.getJobOutput(id); - Preconditions.checkArgument(output != null, "there is no related output for job id:" + id); - output.getInfo().putAll(info); - executableDao.updateJobOutput(output); - } catch (PersistentException e) { - logger.error("error update job info, id:" + id + " info:" + info.toString()); - throw new RuntimeException(e); - } - } - - public void addJobInfo(String id, String key, String value) { - Map<String, String> info = Maps.newHashMap(); - info.put(key, value); - addJobInfo(id, info); - } - - private static ExecutablePO parse(AbstractExecutable executable) { - ExecutablePO result = new ExecutablePO(); - result.setName(executable.getName()); - result.setUuid(executable.getId()); - result.setType(executable.getClass().getName()); - result.setParams(executable.getParams()); - if (executable instanceof ChainedExecutable) { - List<ExecutablePO> tasks = Lists.newArrayList(); - for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) { - tasks.add(parse(task)); - } - result.setTasks(tasks); - } - return result; - } - - private static AbstractExecutable parseTo(ExecutablePO executablePO) { - if (executablePO == null) { - return null; - } - String type = executablePO.getType(); - try { - Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class); - Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor(); - AbstractExecutable result = constructor.newInstance(); - result.setId(executablePO.getUuid()); - result.setName(executablePO.getName()); - result.setParams(executablePO.getParams()); - List<ExecutablePO> tasks = executablePO.getTasks(); - if (tasks != null && !tasks.isEmpty()) { - Preconditions.checkArgument(result instanceof ChainedExecutable); - for (ExecutablePO subTask : tasks) { - ((ChainedExecutable) result).addTask(parseTo(subTask)); - } - } - return result; - } catch (ReflectiveOperationException e) { - throw new IllegalArgumentException("cannot parse this job:" + executablePO.getId(), e); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java deleted file mode 100644 index b6e5af5..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.io.IOException; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by honma on 11/11/14. - */ -public class CleanHtableCLI extends AbstractHadoopJob { - - protected static final Logger log = LoggerFactory.getLogger(CleanHtableCLI.class); - - String tableName; - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - try { - - clean(); - - return 0; - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - private void clean() throws IOException { - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); - - for (HTableDescriptor descriptor : hbaseAdmin.listTables()) { - String name = descriptor.getNameAsString().toLowerCase(); - if (name.startsWith("kylin") || name.startsWith("_kylin")) { - String x = descriptor.getValue("KYLIN_HOST"); - System.out.println("table name " + descriptor.getNameAsString() + " host: " + x); - System.out.println(descriptor); - System.out.println(); - } - } - hbaseAdmin.close(); - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new CleanHtableCLI(), args); - System.exit(exitCode); - } -}
