KYLIN-1116 Use local dictionary for InvertedIndex batch building Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/82bfa924 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/82bfa924 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/82bfa924
Branch: refs/heads/KYLIN-1116 Commit: 82bfa924c48db87297be6a7720357b500d48511e Parents: f8590d2 Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Nov 6 16:24:29 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Fri Nov 6 16:24:29 2015 +0800 ---------------------------------------------------------------------- .../apache/kylin/job/BuildIIWithStreamTest.java | 15 +- .../kylin/job/hadoop/invertedindex/IITest.java | 11 +- .../kylin/common/util/StreamingBatch.java | 59 +++++ .../kylin/common/util/StreamingMessage.java | 43 ++++ .../realization/IRealizationSegment.java | 1 - .../kylin/engine/mr/JobBuilderSupport.java | 8 +- .../engine/mr/common/AbstractHadoopJob.java | 3 - .../mr/invertedindex/BatchIIJobBuilder.java | 61 +----- .../CreateInvertedIndexDictionaryJob.java | 70 ------ .../IIDistinctColumnsCombiner.java | 58 ----- .../mr/invertedindex/IIDistinctColumnsJob.java | 138 ------------ .../invertedindex/IIDistinctColumnsMapper.java | 75 ------- .../invertedindex/IIDistinctColumnsReducer.java | 77 ------- .../engine/mr/invertedindex/IIJobBuilder.java | 219 ------------------- .../mr/invertedindex/InvertedIndexJob.java | 10 +- .../mr/invertedindex/InvertedIndexMapper.java | 32 +-- .../mr/invertedindex/InvertedIndexReducer.java | 89 +++++--- .../UpdateIIInfoAfterBuildStep.java | 84 +++++++ .../UpdateInvertedIndexInfoAfterBuildStep.java | 93 -------- .../kylin/engine/streaming/IStreamingInput.java | 2 + .../streaming/OneOffStreamingBuilder.java | 1 + .../kylin/engine/streaming/StreamingBatch.java | 61 ------ .../engine/streaming/StreamingBatchBuilder.java | 1 + .../engine/streaming/StreamingMessage.java | 43 ---- .../streaming/cube/StreamingCubeBuilder.java | 4 +- .../streaming/invertedindex/SliceBuilder.java | 81 ------- .../invertedindex/test_kylin_ii_inner_join.json | 45 +--- .../invertedindex/test_kylin_ii_left_join.json | 45 +--- .../invertedindex/test_streaming_table_ii.json | 22 +- .../apache/kylin/invertedindex/IIManager.java | 50 +---- .../apache/kylin/invertedindex/IISegment.java | 51 +---- .../kylin/invertedindex/index/SliceBuilder.java | 77 +++++++ .../invertedindex/IIInstanceTest.java | 5 - .../apache/kylin/source/hive/HiveMRInput.java | 3 + .../kylin/source/kafka/KafkaStreamingInput.java | 4 +- .../kylin/source/kafka/StreamingParser.java | 2 +- .../source/kafka/StringStreamingParser.java | 2 +- .../source/kafka/TimedJsonStreamParser.java | 2 +- .../kafka/diagnose/KafkaInputAnalyzer.java | 2 +- .../kylin/source/kafka/util/KafkaUtils.java | 2 +- .../storage/hbase/ii/IICreateHFileMapper.java | 19 +- .../storage/hbase/ii/IICreateHTableJob.java | 31 ++- .../storage/hbase/steps/HBaseMROutput.java | 2 +- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 18 +- .../hbase/util/DeployCoprocessorCLI.java | 3 +- .../storage/hbase/util/StorageCleanupJob.java | 2 - .../hbase/common/TsConditionEraserTest.java | 6 +- .../endpoint/EndpointAggregationTest.java | 27 ++- .../endpoint/TableRecordInfoTest.java | 5 + .../endpoint/TsConditionExtractorTest.java | 5 + 50 files changed, 469 insertions(+), 1300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java index b64a7c5..2d40b09 100644 --- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java +++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java @@ -51,8 +51,8 @@ import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.mr.JobBuilderSupport; -import org.apache.kylin.engine.streaming.StreamingBatch; -import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.common.util.StreamingBatch; +import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; @@ -61,7 +61,7 @@ import org.apache.kylin.invertedindex.model.IIDesc; import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; import org.apache.kylin.invertedindex.model.IIKeyValueCodec; import org.apache.kylin.invertedindex.model.IIRow; -import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder; +import org.apache.kylin.invertedindex.index.SliceBuilder; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; @@ -196,14 +196,13 @@ public class BuildIIWithStreamTest { int count = sorted.size(); ArrayList<StreamingMessage> messages = Lists.newArrayList(); for (String[] row : sorted) { - if (messages.size() < iiDesc.getSliceSize()) { - messages.add(parse(row)); - } else { + messages.add((parse(row))); + if (messages.size() >= iiDesc.getSliceSize()) { build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); - messages = Lists.newArrayList(); - messages.add((parse(row))); + messages.clear(); } } + if (!messages.isEmpty()) { build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java index 200156a..a393ce3 100644 --- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java +++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java @@ -23,10 +23,11 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.util.FIFOIterable; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.engine.streaming.StreamingBatch; -import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.common.util.StreamingBatch; +import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.index.Slice; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; @@ -35,7 +36,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec; import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState; import org.apache.kylin.invertedindex.model.IIRow; import org.apache.kylin.invertedindex.model.KeyValueCodec; -import org.apache.kylin.engine.streaming.invertedindex.SliceBuilder; +import org.apache.kylin.invertedindex.index.SliceBuilder; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.ConstantTupleFilter; @@ -146,6 +147,10 @@ public class IITest extends LocalFileMetadataTestCase { @Test public void IIEndpointTest() { TableRecordInfo info = new TableRecordInfo(ii.getDescriptor()); + if (ii.getFirstSegment() == null) { + IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis()); + ii.getSegments().add(segment); + } CoprocessorRowType type = CoprocessorRowType.fromTableRecordInfo(info, ii.getFirstSegment().getColumns()); CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(info, Collections.singletonList(ii.getDescriptor().findColumnRef("default.test_kylin_fact", "lstg_format_name"))); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java new file mode 100644 index 0000000..ae5b72f --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingBatch.java @@ -0,0 +1,59 @@ +/* + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ +package org.apache.kylin.common.util; + +import java.util.List; + +/** + */ +public final class StreamingBatch { + + private final List<StreamingMessage> messages; + + private final Pair<Long, Long> timeRange; + + public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) { + this.messages = messages; + this.timeRange = timeRange; + } + + public List<StreamingMessage> getMessages() { + return messages; + } + + public Pair<Long, Long> getTimeRange() { + return timeRange; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java new file mode 100644 index 0000000..2c150ff --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java @@ -0,0 +1,43 @@ +package org.apache.kylin.common.util; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + */ +public class StreamingMessage { + + private final List<String> data; + + private long offset; + + private long timestamp; + + private Map<String, Object> params; + + public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap()); + + public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) { + this.data = data; + this.offset = offset; + this.timestamp = timestamp; + this.params = params; + } + + public final List<String> getData() { + return data; + } + + public final long getOffset() { + return offset; + } + + public final long getTimestamp() { + return timestamp; + } + + public Map<String, Object> getParams() { + return params; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java index afab86b..3ac82a0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationSegment.java @@ -4,7 +4,6 @@ import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; /** - * Created by shaoshi on 10/30/15. */ public interface IRealizationSegment extends IBuildable { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 8c770f9..e3b07d8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -25,13 +25,12 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; -import org.apache.kylin.engine.mr.invertedindex.UpdateInvertedIndexInfoAfterBuildStep; +import org.apache.kylin.engine.mr.invertedindex.UpdateIIInfoAfterBuildStep; import org.apache.kylin.engine.mr.steps.CreateDictionaryJob; import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob; import org.apache.kylin.engine.mr.steps.MergeDictionaryStep; import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep; import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep; -import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; @@ -125,11 +124,10 @@ public class JobBuilderSupport { - public UpdateInvertedIndexInfoAfterBuildStep createUpdateInvertedIndexInfoAfterBuildStep(String jobId) { - final UpdateInvertedIndexInfoAfterBuildStep updateIIInfoStep = new UpdateInvertedIndexInfoAfterBuildStep(); + public UpdateIIInfoAfterBuildStep createUpdateIIInfoAfterBuildStep(String jobId) { + final UpdateIIInfoAfterBuildStep updateIIInfoStep = new UpdateIIInfoAfterBuildStep(); updateIIInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_II_INFO); updateIIInfoStep.setInvertedIndexName(seg.getRealization().getName()); - updateIIInfoStep.setSegmentId(seg.getUuid()); updateIIInfoStep.setJobId(jobId); return updateIIInfoStep; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 8782bbe..366a730 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -337,9 +337,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { TableDesc table = metaMgr.getTableDesc(tableName); dumpList.add(table.getResourcePath()); } - for (IISegment segment : ii.getSegments()) { - dumpList.addAll(segment.getDictionaryPaths()); - } attachKylinPropsAndMetadata(dumpList, conf); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java index 97e27d0..e7501b8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java @@ -18,30 +18,16 @@ package org.apache.kylin.engine.mr.invertedindex; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMROutput; -import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; -import org.apache.kylin.engine.mr.steps.BaseCuboidJob; -import org.apache.kylin.engine.mr.steps.NDCuboidJob; import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.metadata.realization.IRealizationSegment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public class BatchIIJobBuilder extends JobBuilderSupport { private static final Logger logger = LoggerFactory.getLogger(BatchIIJobBuilder.class); @@ -64,54 +50,20 @@ public class BatchIIJobBuilder extends JobBuilderSupport { final String iiRootPath = getRealizationRootPath(jobId) + "/"; // Phase 1: Create Flat Table inputSide.addStepPhase1_CreateFlatTable(result); - - final String intermediateTableIdentity = seg.getJoinedFlatTableDesc().getTableName(); - // Phase 2: Build Dictionary - result.addTask(createIIFactDistinctColumnsStep(seg, intermediateTableIdentity, getFactDistinctColumnsPath(jobId))); - result.addTask(createIIBuildDictionaryStep(seg, getFactDistinctColumnsPath(jobId))); - - // Phase 3: Build Cube - result.addTask(createInvertedIndexStep((IISegment)seg, intermediateTableIdentity, iiRootPath)); + + // Phase 2: Build Inverted Index + result.addTask(createInvertedIndexStep((IISegment)seg, iiRootPath)); outputSide.addStepPhase3_BuildII(result, iiRootPath); - // Phase 4: Update Metadata & Cleanup - result.addTask(createUpdateInvertedIndexInfoAfterBuildStep(jobId)); + // Phase 3: Update Metadata & Cleanup + result.addTask(createUpdateIIInfoAfterBuildStep(jobId)); inputSide.addStepPhase4_Cleanup(result); outputSide.addStepPhase4_Cleanup(result); return result; } - private MapReduceExecutable createIIFactDistinctColumnsStep(IRealizationSegment seg, String factTableName, String output) { - MapReduceExecutable result = new MapReduceExecutable(); - result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); - result.setMapReduceJobClass(IIDistinctColumnsJob.class); - StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc()); - appendExecCmdParameters(cmd, "tablename", factTableName); - appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "output", output); - appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step"); - - result.setMapReduceParams(cmd.toString()); - return result; - } - - private HadoopShellExecutable createIIBuildDictionaryStep(IRealizationSegment seg, String factDistinctColumnsPath) { - // base cuboid job - HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); - buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "input", factDistinctColumnsPath); - - buildDictionaryStep.setJobParams(cmd.toString()); - buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class); - return buildDictionaryStep; - } - - private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) { - // base cuboid job + private MapReduceExecutable createInvertedIndexStep(IISegment seg, String iiOutputTempPath) { MapReduceExecutable buildIIStep = new MapReduceExecutable(); StringBuilder cmd = new StringBuilder(); @@ -120,7 +72,6 @@ public class BatchIIJobBuilder extends JobBuilderSupport { buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II); appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "tablename", intermediateHiveTable); appendExecCmdParameters(cmd, "output", iiOutputTempPath); appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java deleted file mode 100644 index 39d74b4..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/CreateInvertedIndexDictionaryJob.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.invertedindex; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.dict.DistinctColumnValuesProvider; -import org.apache.kylin.engine.mr.DFSFileTable; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.ReadableTable; - -/** - */ -public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob { - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_II_NAME); - options.addOption(OPTION_INPUT_PATH); - parseOptions(options, args); - - final String iiname = getOptionValue(OPTION_II_NAME); - final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH); - final KylinConfig config = KylinConfig.getInstanceFromEnv(); - - IIManager mgr = IIManager.getInstance(config); - IIInstance ii = mgr.getII(iiname); - - mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), new DistinctColumnValuesProvider() { - @Override - public ReadableTable getDistinctValuesFor(TblColRef col) { - return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1); - } - }); - return 0; - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java deleted file mode 100644 index 651ad63..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsCombiner.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.invertedindex; - -import java.io.IOException; -import java.util.HashSet; - -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.engine.mr.KylinReducer; - -/** - * @author yangli9 - */ -public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> { - - private Text outputValue = new Text(); - - @Override - protected void setup(Context context) throws IOException { - super.bindCurrentConfiguration(context.getConfiguration()); - - } - - @Override - public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - - HashSet<ByteArray> set = new HashSet<ByteArray>(); - for (Text textValue : values) { - ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); - set.add(value); - } - - for (ByteArray value : set) { - outputValue.set(value.array(), value.offset(), value.length()); - context.write(key, outputValue); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java deleted file mode 100644 index fe968b1..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsJob.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.invertedindex; - -import java.io.IOException; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.IMRInput; -import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.IntermediateColumnDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - */ -public class IIDistinctColumnsJob extends AbstractHadoopJob { - protected static final Logger logger = LoggerFactory.getLogger(IIDistinctColumnsJob.class); - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_TABLE_NAME); - options.addOption(OPTION_II_NAME); - options.addOption(OPTION_OUTPUT_PATH); - parseOptions(options, args); - - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase(); - String iiName = getOptionValue(OPTION_II_NAME); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - - // ---------------------------------------------------------------------------- - - logger.info("Starting: " + job.getJobName() + " on table " + tableName); - - IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); - IIInstance ii = iiMgr.getII(iiName); - job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName); - job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii)); - - setJobClasspath(job); - - - setupMapper(ii.getFirstSegment()); - setupReducer(output); - - Configuration conf = job.getConfiguration(); - conf.set(BatchConstants.CFG_II_NAME, ii.getName()); - attachKylinPropsAndMetadata(ii, conf); - return waitForCompletion(job); - - } catch (Exception e) { - printUsage(options); - throw e; - } - - } - - private String getColumns(IIInstance ii) { - IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor()); - StringBuilder buf = new StringBuilder(); - for (IntermediateColumnDesc col : iiflat.getColumnList()) { - if (buf.length() > 0) - buf.append(","); - buf.append(col.getColumnName()); - } - return buf.toString(); - } - - private void setupMapper(IISegment segment) throws IOException { - - IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat(); - flatTableInputFormat.configureJob(job); - - job.setMapperClass(IIDistinctColumnsMapper.class); - job.setCombinerClass(IIDistinctColumnsCombiner.class); - job.setMapOutputKeyClass(ShortWritable.class); - job.setMapOutputValueClass(Text.class); - } - - private void setupReducer(Path output) throws IOException { - job.setReducerClass(IIDistinctColumnsReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(Text.class); - - FileOutputFormat.setOutputPath(job, output); - job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); - - job.setNumReduceTasks(1); - - deletePath(job.getConfiguration(), output); - } - - public static void main(String[] args) throws Exception { - IIDistinctColumnsJob job = new IIDistinctColumnsJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java deleted file mode 100644 index c431ecd..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsMapper.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.invertedindex; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.engine.mr.IMRInput; -import org.apache.kylin.engine.mr.KylinMapper; -import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; - -/** - * @author yangli9 - */ -public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, Object, ShortWritable, Text> { - - private ShortWritable outputKey = new ShortWritable(); - private Text outputValue = new Text(); - - protected IMRInput.IMRTableInputFormat flatTableInputFormat; - - @Override - protected void setup(Context context) throws IOException { - super.bindCurrentConfiguration(context.getConfiguration()); - Configuration conf = context.getConfiguration(); - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); - - String iiName = conf.get(BatchConstants.CFG_II_NAME); - IIInstance iiInstance = IIManager.getInstance(config).getII(iiName); - IISegment seg = iiInstance.getFirstSegment(); - flatTableInputFormat = MRUtil.getBatchCubingInputSide(seg).getFlatTableInputFormat(); - } - - @Override - public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException { - - String[] row = flatTableInputFormat.parseMapperInput(record); - - for (short i = 0; i < row.length; i++) { - outputKey.set(i); - if (row[i] == null) - continue; - byte[] bytes = Bytes.toBytes(row[i].toString()); - outputValue.set(bytes, 0, bytes.length); - context.write(outputKey, outputValue); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java deleted file mode 100644 index d50385f..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIDistinctColumnsReducer.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.invertedindex; - -import java.io.IOException; -import java.util.HashSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.engine.mr.KylinReducer; -import org.apache.kylin.engine.mr.common.BatchConstants; - -/** - * @author yangli9 - */ -public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> { - - private String[] columns; - - @Override - protected void setup(Context context) throws IOException { - super.bindCurrentConfiguration(context.getConfiguration()); - - Configuration conf = context.getConfiguration(); - this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(","); - } - - @Override - public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - String columnName = columns[key.get()]; - - HashSet<ByteArray> set = new HashSet<ByteArray>(); - for (Text textValue : values) { - ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); - set.add(value); - } - - Configuration conf = context.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - String outputPath = conf.get(BatchConstants.OUTPUT_PATH); - FSDataOutputStream out = fs.create(new Path(outputPath, columnName)); - - try { - for (ByteArray value : set) { - out.write(value.array(), value.offset(), value.length()); - out.write('\n'); - } - } finally { - out.close(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java deleted file mode 100644 index 18d3001..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/IIJobBuilder.java +++ /dev/null @@ -1,219 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -//*/ -// -//package org.apache.kylin.engine.mr.invertedindex; -// -//import java.io.IOException; -//import java.text.SimpleDateFormat; -//import java.util.Date; -//import java.util.TimeZone; -// -//import org.apache.kylin.engine.mr.common.HadoopShellExecutable; -//import org.apache.kylin.engine.mr.common.MapReduceExecutable; -//import org.apache.kylin.invertedindex.IISegment; -//import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; -//import org.apache.kylin.job.constant.ExecutableConstants; -//import org.apache.kylin.job.engine.JobEngineConfig; -//import org.apache.kylin.job.execution.AbstractExecutable; -//import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity; -// -//import com.google.common.base.Preconditions; -// -///** -// */ -//public final class IIJobBuilder { -// -// final JobEngineConfig engineConfig; -// -// public IIJobBuilder(JobEngineConfig engineConfig) { -// this.engineConfig = engineConfig; -// } -// -// public IIJob buildJob(IISegment seg, String submitter) { -// checkPreconditions(seg); -// -// IIJob result = initialJob(seg, "BUILD", submitter); -// final String jobId = result.getId(); -// final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc()); -// final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc); -// final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId); -// final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/"; -// final String iiPath = iiRootPath + "*"; -// -// final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId); -// result.addTask(intermediateHiveTableStep); -// -// result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath)); -// -// result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath)); -// -// result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath)); -// -// // create htable step -// result.addTask(createCreateHTableStep(seg)); -// -// // generate hfiles step -// result.addTask(createConvertToHfileStep(seg, iiPath, jobId)); -// -// // bulk load step -// result.addTask(createBulkLoadStep(seg, jobId)); -// -// return result; -// } -// -// private IIJob initialJob(IISegment seg, String type, String submitter) { -// IIJob result = new IIJob(); -// SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); -// format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone())); -// result.setIIName(seg.getIIInstance().getName()); -// result.setSegmentId(seg.getUuid()); -// result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis()))); -// result.setSubmitter(submitter); -// return result; -// } -// -// private void checkPreconditions(IISegment seg) { -// Preconditions.checkNotNull(seg, "segment cannot be null"); -// Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null"); -// } -// -// private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) { -// try { -// String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM); -// if (jobConf != null && jobConf.length() > 0) { -// builder.append(" -conf ").append(jobConf); -// } -// } catch (IOException e) { -// throw new RuntimeException(e); -// } -// } -// -// private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) { -// return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns"; -// } -// -// private String getHFilePath(IISegment seg, String jobId) { -// return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/"; -// } -// -// private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) { -// MapReduceExecutable result = new MapReduceExecutable(); -// result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); -// result.setMapReduceJobClass(IIDistinctColumnsJob.class); -// StringBuilder cmd = new StringBuilder(); -// appendMapReduceParameters(cmd, engineConfig); -// appendExecCmdParameters(cmd, "tablename", factTableName); -// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); -// appendExecCmdParameters(cmd, "output", output); -// appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step"); -// -// result.setMapReduceParams(cmd.toString()); -// return result; -// } -// -// private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) { -// // base cuboid job -// HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); -// buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); -// StringBuilder cmd = new StringBuilder(); -// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); -// appendExecCmdParameters(cmd, "input", factDistinctColumnsPath); -// -// buildDictionaryStep.setJobParams(cmd.toString()); -// buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class); -// return buildDictionaryStep; -// } -// -// private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) { -// // base cuboid job -// MapReduceExecutable buildIIStep = new MapReduceExecutable(); -// -// StringBuilder cmd = new StringBuilder(); -// appendMapReduceParameters(cmd, engineConfig); -// -// buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II); -// -// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); -// appendExecCmdParameters(cmd, "tablename", intermediateHiveTable); -// appendExecCmdParameters(cmd, "output", iiOutputTempPath); -// appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II); -// -// buildIIStep.setMapReduceParams(cmd.toString()); -// buildIIStep.setMapReduceJobClass(InvertedIndexJob.class); -// return buildIIStep; -// } -// -// private HadoopShellExecutable createCreateHTableStep(IISegment seg) { -// HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); -// createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); -// StringBuilder cmd = new StringBuilder(); -// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); -// appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); -// -// createHtableStep.setJobParams(cmd.toString()); -// createHtableStep.setJobClass(IICreateHTableJob.class); -// -// return createHtableStep; -// } -// -// private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) { -// MapReduceExecutable createHFilesStep = new MapReduceExecutable(); -// createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE); -// StringBuilder cmd = new StringBuilder(); -// -// appendMapReduceParameters(cmd, engineConfig); -// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); -// appendExecCmdParameters(cmd, "input", inputPath); -// appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId)); -// appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); -// appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step"); -// -// createHFilesStep.setMapReduceParams(cmd.toString()); -// createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class); -// -// return createHFilesStep; -// } -// -// private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) { -// HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable(); -// bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE); -// -// StringBuilder cmd = new StringBuilder(); -// appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId)); -// appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); -// appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); -// -// bulkLoadStep.setJobParams(cmd.toString()); -// bulkLoadStep.setJobClass(IIBulkLoadJob.class); -// -// return bulkLoadStep; -// -// } -// -// private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) { -// return buf.append(" -").append(paraName).append(" ").append(paraValue); -// } -// -// private String getJobWorkingDir(String uuid) { -// return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid; -// } -// -// private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) { -// return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName(); -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java index bcae524..5191aca 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; @@ -44,7 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author yangli9 */ public class InvertedIndexJob extends AbstractHadoopJob { protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class); @@ -56,13 +56,11 @@ public class InvertedIndexJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_II_NAME); - options.addOption(OPTION_TABLE_NAME); options.addOption(OPTION_OUTPUT_PATH); parseOptions(options, args); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); String iiname = getOptionValue(OPTION_II_NAME); - String intermediateTable = getOptionValue(OPTION_TABLE_NAME); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); // ---------------------------------------------------------------------------- @@ -111,17 +109,11 @@ public class InvertedIndexJob extends AbstractHadoopJob { private void setupMapper(IISegment segment) throws IOException { -// String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable); -// HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); -// -// job.setInputFormatClass(HCatInputFormat.class); IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat(); flatTableInputFormat.configureJob(job); - job.setMapperClass(InvertedIndexMapper.class); job.setMapOutputKeyClass(LongWritable.class); - job.setMapOutputValueClass(ImmutableBytesWritable.class); job.setPartitionerClass(InvertedIndexPartitioner.class); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java index 88249ed..a1251a3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexMapper.java @@ -18,13 +18,11 @@ package org.apache.kylin.engine.mr.invertedindex; -import java.io.IOException; -import java.util.List; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.MRUtil; @@ -33,20 +31,19 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.index.TableRecord; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import java.io.IOException; + /** * @author yangli9 */ -public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongWritable, ImmutableBytesWritable> { +public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongWritable, Writable> { private TableRecordInfo info; - private TableRecord rec; private LongWritable outputKey; - private ImmutableBytesWritable outputValue; private IMRInput.IMRTableInputFormat flatTableInputFormat; @Override @@ -60,10 +57,8 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongW IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME)); IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW); this.info = new TableRecordInfo(seg); - this.rec = this.info.createTableRecord(); outputKey = new LongWritable(); - outputValue = new ImmutableBytesWritable(rec.getBytes()); flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat(); } @@ -71,17 +66,12 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, Object, LongW @Override public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException { - String[] row = flatTableInputFormat.parseMapperInput(record); - rec.reset(); - for (int i = 0; i < row.length; i++) { - Object fieldValue = row[i]; - if (fieldValue != null) - rec.setValueString(i, fieldValue.toString()); - } - - outputKey.set(rec.getTimestamp()); - // outputValue's backing bytes array is the same as rec + Writable writableRecord = (Writable) record; + String[] row = flatTableInputFormat.parseMapperInput(writableRecord); + String timestampString = row[info.getTimestampColumn()]; - context.write(outputKey, outputValue); + outputKey.set(DateFormat.stringToMillis(timestampString)); + // + context.write(outputKey, writableRecord); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java index 7644456..56c0b9e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexReducer.java @@ -18,35 +18,45 @@ package org.apache.kylin.engine.mr.invertedindex; -import java.io.IOException; - +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.StreamingBatch; +import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.KylinReducer; +import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.index.IncrementalSliceMaker; import org.apache.kylin.invertedindex.index.Slice; -import org.apache.kylin.invertedindex.index.TableRecord; +import org.apache.kylin.invertedindex.index.SliceBuilder; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.invertedindex.model.IIKeyValueCodec; import org.apache.kylin.invertedindex.model.IIRow; -import org.apache.kylin.metadata.model.SegmentStatusEnum; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; /** - * @author yangli9 */ -public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> { +public class InvertedIndexReducer extends KylinReducer<LongWritable, Object, ImmutableBytesWritable, ImmutableBytesWritable> { private TableRecordInfo info; - private TableRecord rec; - private IncrementalSliceMaker builder; private IIKeyValueCodec kv; + private IMRInput.IMRTableInputFormat flatTableInputFormat; + private SliceBuilder sliceBuilder; + private ArrayList<StreamingMessage> messages; + private int sliceSize; + private ImmutableBytesWritable immutableBytesWritable; + private ByteBuffer valueBuf; @Override protected void setup(Context context) throws IOException { @@ -56,44 +66,61 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBy KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); IIManager mgr = IIManager.getInstance(config); IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME)); - IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW); + IISegment seg = ii.getFirstSegment(); info = new TableRecordInfo(seg); - rec = info.createTableRecord(); - builder = null; kv = new IIKeyValueCodec(info.getDigest()); + flatTableInputFormat = MRUtil.getBatchCubingInputSide(ii.getFirstSegment()).getFlatTableInputFormat(); + sliceSize = ii.getDescriptor().getSliceSize(); + short shard = (short) context.getTaskAttemptID().getTaskID().getId(); + System.out.println("Generating to shard - " + shard); + sliceBuilder = new SliceBuilder(seg.getIIDesc(), shard, true); + messages = Lists.newArrayListWithCapacity(sliceSize); + immutableBytesWritable = new ImmutableBytesWritable(); + valueBuf = ByteBuffer.allocate(1024 * 1024); // 1MB } @Override - public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) // + public void reduce(LongWritable key, Iterable<Object> values, Context context) // throws IOException, InterruptedException { - for (ImmutableBytesWritable v : values) { - rec.setBytes(v.get(), v.getOffset(), v.getLength()); - - if (builder == null) { - builder = new IncrementalSliceMaker(info, rec.getShard()); - } - - //TODO: to delete this log - System.out.println(rec.getShard() + " - " + rec); - - Slice slice = builder.append(rec); - if (slice != null) { - output(slice, context); + for (Object v : values) { + String[] row = flatTableInputFormat.parseMapperInput(v); + messages.add((parse(row))); + if (messages.size() >= sliceSize) { + buildAndOutput(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context); + messages = Lists.newArrayList(); } } } + private StreamingMessage parse(String[] row) { + return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object> emptyMap()); + } + @Override protected void cleanup(Context context) throws IOException, InterruptedException { - Slice slice = builder.close(); - if (slice != null) { - output(slice, context); + if (!messages.isEmpty()) { + buildAndOutput(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), context); + messages.clear(); } + } - private void output(Slice slice, Context context) throws IOException, InterruptedException { + private void buildAndOutput(StreamingBatch streamingBatch, Context context) throws IOException, InterruptedException { + final Slice slice = sliceBuilder.buildSlice(streamingBatch); + ImmutableBytesWritable value, dictionary; for (IIRow pair : kv.encodeKeyValue(slice)) { - context.write(pair.getKey(), pair.getValue()); + value = pair.getValue(); + dictionary = pair.getDictionary(); + int newLength = 4 + value.getLength() + dictionary.getLength(); + if (newLength > valueBuf.limit()) { + valueBuf = ByteBuffer.allocate(newLength); + } + valueBuf.clear(); + valueBuf.putInt(value.getLength()); + valueBuf.put(value.get(), value.getOffset(), value.getLength()); + valueBuf.put(dictionary.get(), dictionary.getOffset(), dictionary.getLength()); + immutableBytesWritable.set(valueBuf.array(), 0, newLength); + context.write(pair.getKey(), immutableBytesWritable); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java new file mode 100644 index 0000000..ef9b1c3 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateIIInfoAfterBuildStep.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.invertedindex; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.model.SegmentStatusEnum; + +import java.io.IOException; + +/** + */ +public class UpdateIIInfoAfterBuildStep extends AbstractExecutable { + + private static final String II_NAME = "iiName"; + private static final String JOB_ID = "jobId"; + + public UpdateIIInfoAfterBuildStep() { + super(); + } + + public void setInvertedIndexName(String cubeName) { + this.setParam(II_NAME, cubeName); + } + + private String getInvertedIndexName() { + return getParam(II_NAME); + } + + public void setJobId(String id) { + setParam(JOB_ID, id); + } + + private String getJobId() { + return getParam(JOB_ID); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + + IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); + IIInstance ii = mgr.getII(getInvertedIndexName()); + IISegment segment = ii.getFirstSegment(); + segment.setStatus(SegmentStatusEnum.READY); + + segment.setLastBuildJobID(getJobId()); + segment.setLastBuildTime(System.currentTimeMillis()); + + try { + mgr.updateII(ii); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (IOException e) { + logger.error("fail to update inverted index after build", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java deleted file mode 100644 index 277dea5..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/UpdateInvertedIndexInfoAfterBuildStep.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.invertedindex; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.model.SegmentStatusEnum; - -import java.io.IOException; - -/** - */ -public class UpdateInvertedIndexInfoAfterBuildStep extends AbstractExecutable { - - private static final String SEGMENT_ID = "segmentId"; - private static final String II_NAME = "iiName"; - private static final String JOB_ID = "jobId"; - - public UpdateInvertedIndexInfoAfterBuildStep() { - super(); - } - - public void setInvertedIndexName(String cubeName) { - this.setParam(II_NAME, cubeName); - } - - private String getInvertedIndexName() { - return getParam(II_NAME); - } - - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setJobId(String id) { - setParam(JOB_ID, id); - } - - private String getJobId() { - return getParam(JOB_ID); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - - IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); - IIInstance ii = mgr.getII(getInvertedIndexName()); - IISegment segment = ii.getFirstSegment(); - segment.setStatus(SegmentStatusEnum.READY); - - segment.setLastBuildJobID(getJobId()); - segment.setLastBuildTime(System.currentTimeMillis()); - - try { - mgr.updateII(ii); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (IOException e) { - logger.error("fail to update inverted index after build", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java index d8cb24b..1cf3d98 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java @@ -33,6 +33,8 @@ */ package org.apache.kylin.engine.streaming; +import org.apache.kylin.common.util.StreamingBatch; + /** */ public interface IStreamingInput { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java index 475e43a..4a3b8b8 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java @@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming; import java.util.Map; import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.common.util.StreamingBatch; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.engine.streaming.util.StreamingUtils; import org.apache.kylin.metadata.model.IBuildable; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java deleted file mode 100644 index c7c7d29..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatch.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ -package org.apache.kylin.engine.streaming; - -import java.util.List; - -import org.apache.kylin.common.util.Pair; - -/** - */ -public final class StreamingBatch { - - private final List<StreamingMessage> messages; - - private final Pair<Long, Long> timeRange; - - public StreamingBatch(List<StreamingMessage> messages, Pair<Long, Long> timeRange) { - this.messages = messages; - this.timeRange = timeRange; - } - - public List<StreamingMessage> getMessages() { - return messages; - } - - public Pair<Long, Long> getTimeRange() { - return timeRange; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java index d55ccdb..93cda2d 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java @@ -36,6 +36,7 @@ package org.apache.kylin.engine.streaming; import java.util.Map; import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.common.util.StreamingBatch; import org.apache.kylin.cube.inmemcubing.ICuboidWriter; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.metadata.model.IBuildable; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java deleted file mode 100644 index 7885902..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingMessage.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.apache.kylin.engine.streaming; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -/** - */ -public class StreamingMessage { - - private final List<String> data; - - private long offset; - - private long timestamp; - - private Map<String, Object> params; - - public static final StreamingMessage EOF = new StreamingMessage(Collections.<String> emptyList(), 0L, 0L, Collections.<String, Object> emptyMap()); - - public StreamingMessage(List<String> data, long offset, long timestamp, Map<String, Object> params) { - this.data = data; - this.offset = offset; - this.timestamp = timestamp; - this.params = params; - } - - public final List<String> getData() { - return data; - } - - public final long getOffset() { - return offset; - } - - public final long getTimestamp() { - return timestamp; - } - - public Map<String, Object> getParams() { - return params; - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java index 64a3061..ae72218 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java @@ -54,9 +54,9 @@ import org.apache.kylin.cube.inmemcubing.ICuboidWriter; import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder; import org.apache.kylin.cube.util.CubingUtils; import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.engine.streaming.StreamingBatch; +import org.apache.kylin.common.util.StreamingBatch; import org.apache.kylin.engine.streaming.StreamingBatchBuilder; -import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java deleted file mode 100644 index fa5a0b2..0000000 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/invertedindex/SliceBuilder.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.streaming.invertedindex; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.engine.streaming.StreamingBatch; -import org.apache.kylin.engine.streaming.StreamingMessage; -import org.apache.kylin.invertedindex.index.BatchSliceMaker; -import org.apache.kylin.invertedindex.index.Slice; -import org.apache.kylin.invertedindex.index.TableRecord; -import org.apache.kylin.invertedindex.index.TableRecordInfo; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.invertedindex.util.IIDictionaryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.util.List; - -/** - */ -public final class SliceBuilder { - - private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class); - - private final BatchSliceMaker sliceMaker; - private final IIDesc iiDesc; - private final boolean useLocalDict; - - public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) { - this.iiDesc = desc; - this.sliceMaker = new BatchSliceMaker(desc, shard); - this.useLocalDict = useLocalDict; - } - - public Slice buildSlice(StreamingBatch microStreamBatch) { - final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() { - @Nullable - @Override - public List<String> apply(@Nullable StreamingMessage input) { - return input.getData(); - } - }); - final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()]; - TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries); - return build(messages, tableRecordInfo, dictionaries); - } - - private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) { - final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() { - @Nullable - @Override - public TableRecord apply(@Nullable List<String> input) { - TableRecord result = tableRecordInfo.createTableRecord(); - for (int i = 0; i < input.size(); i++) { - result.setValueString(i, input.get(i)); - } - return result; - } - })); - slice.setLocalDictionaries(localDictionary); - return slice; - } -}