Repository: incubator-kylin Updated Branches: refs/heads/KYLIN-1116 [created] 82bfa924c
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json index 237bdd7..4c0c3ef 100644 --- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json +++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json @@ -4,49 +4,8 @@ "owner": null, "version": null, "cost": 10, - "status": "READY", - "segments": [ - { - "uuid": null, - "name": "19700101000000_20140901000000", - "status": "READY", - "dictionaries": { - "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict", - "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict", - "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict", - "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict", - "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict", - "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict", - "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict", - "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict", - "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict", - "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict", - "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict", - "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict", - "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict", - "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict" - }, - "storage_location_identifier": "test_III", - "date_range_start": 0, - "date_range_end": 0, - "size_kb": 0, - "input_records": 0, - "input_records_size": 0, - "last_build_time": 0, - "last_build_job_id": null, - "create_time": null, - "binary_signature": null - } - ], + "status": "DISABLED", + "segments": [], "last_modified": 1420016227424, "descriptor": "test_kylin_ii_inner_join_desc", "create_time": null, http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json index 07c1970..90c21bb 100644 --- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json +++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json @@ -4,49 +4,8 @@ "owner": null, "version": null, "cost": 10, - "status": "READY", - "segments": [ - { - "uuid": null, - "name": "19700101000000_20140901000000", - "status": "READY", - "dictionaries": { - "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict", - "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict", - "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict", - "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict", - "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict", - "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict", - "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict", - "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict", - "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict", - "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict", - "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict", - "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict", - "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict", - "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict", - "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict" - }, - "storage_location_identifier": "test_III", - "date_range_start": 0, - "date_range_end": 0, - "size_kb": 0, - "input_records": 0, - "input_records_size": 0, - "last_build_time": 0, - "last_build_job_id": null, - "create_time": null, - "binary_signature": null - } - ], + "status": "DISABLED", + "segments": [], "last_modified": 1420016227424, "descriptor": "test_kylin_ii_left_join_desc", "create_time": null, http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json index a703ae4..9abe3ed 100644 --- a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json +++ b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json @@ -4,26 +4,8 @@ "owner": null, "version": null, "cost": 10, - "status": "READY", - "segments": [ - { - "uuid": null, - "name": "19700101000000_20190901000000", - "status": "READY", - "dictionaries": { - }, - "storage_location_identifier": "KYLIN_2STEAMTEST", - "date_range_start": 0, - "date_range_end": 0, - "size_kb": 0, - "input_records": 0, - "input_records_size": 0, - "last_build_time": 0, - "last_build_job_id": null, - "create_time": null, - "binary_signature": null - } - ], + "status": "DISABLED", + "segments": [], "last_modified": 0, "descriptor": "test_streaming_table_ii_desc", "create_time": null, http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java index 7aff714..5633004 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java @@ -32,13 +32,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.CaseInsensitiveStringCache; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.dict.DictionaryInfo; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.dict.DistinctColumnValuesProvider; -import org.apache.kylin.invertedindex.model.IIDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.realization.IRealization; @@ -131,45 +125,7 @@ public class IIManager implements IRealizationProvider { } return result; } - - public void buildInvertedIndexDictionary(IISegment iiSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException { - logger.info("Start building ii dictionary"); - DictionaryManager dictMgr = getDictionaryManager(); - IIDesc iiDesc = iiSeg.getIIInstance().getDescriptor(); - for (TblColRef column : iiDesc.listAllColumns()) { - logger.info("Dealing with column {}", column); - if (iiDesc.isMetricsCol(column)) { - continue; - } - - DictionaryInfo dict = dictMgr.buildDictionary(iiDesc.getModel(), "true", column, factTableValueProvider); - iiSeg.putDictResPath(column, dict.getResourcePath()); - } - updateII(iiSeg.getIIInstance()); - } - - /** - * return null if no dictionary for given column - */ - public Dictionary<?> getDictionary(IISegment iiSeg, TblColRef col) { - DictionaryInfo info = null; - try { - DictionaryManager dictMgr = getDictionaryManager(); - // logger.info("Using metadata url " + metadataUrl + - // " for DictionaryManager"); - String dictResPath = iiSeg.getDictResPath(col); - if (dictResPath == null) - return null; - - info = dictMgr.getDictionaryInfo(dictResPath); - if (info == null) - throw new IllegalStateException("No dictionary found by " + dictResPath + ", invalid II state; II segment" + iiSeg + ", col " + col); - } catch (IOException e) { - throw new IllegalStateException("Failed to get dictionary for II segment" + iiSeg + ", col" + col, e); - } - - return info.getDictionaryObject(); - } + public IIInstance createII(IIInstance ii) throws IOException { @@ -300,10 +256,6 @@ public class IIManager implements IRealizationProvider { } } - private DictionaryManager getDictionaryManager() { - return DictionaryManager.getInstance(config); - } - private ResourceStore getStore() { return ResourceStore.getStore(this.config); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java index adcca8b..c3ca464 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java @@ -19,18 +19,14 @@ package org.apache.kylin.invertedindex; import java.text.SimpleDateFormat; -import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.TimeZone; -import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.IDictionaryAware; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.invertedindex.model.IIDesc; import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -49,7 +45,7 @@ import org.apache.kylin.metadata.realization.IRealizationSegment; // TODO: remove segment concept for II, append old hbase table @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class IISegment implements Comparable<IISegment>, IDictionaryAware, IRealizationSegment { +public class IISegment implements Comparable<IISegment>, IRealizationSegment { @JsonBackReference private IIInstance iiInstance; @@ -83,11 +79,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal private String binarySignature; // a hash of schema and dictionary ID, // used for sanity check - @JsonProperty("dictionaries") - private ConcurrentHashMap<String, String> dictionaries; // table/column ==> - // dictionary - // resource path - private transient TableRecordInfo tableRecordInfo; /** @@ -216,28 +207,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal return storageLocationIdentifier; } - public Map<String, String> getDictionaries() { - if (dictionaries == null) - dictionaries = new ConcurrentHashMap<String, String>(); - return dictionaries; - } - - public Collection<String> getDictionaryPaths() { - return getDictionaries().values(); - } - - public String getDictResPath(TblColRef col) { - return getDictionaries().get(dictKey(col)); - } - - public void putDictResPath(TblColRef col, String dictResPath) { - getDictionaries().put(dictKey(col), dictResPath); - } - - private String dictKey(TblColRef col) { - return col.getTable() + "/" + col.getName(); - } - /** * @param storageLocationIdentifier the storageLocationIdentifier to set */ @@ -262,10 +231,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal return tableRecordInfo; } - // public void updateDictionary(List<Dictionary<?>> dicts) { - // getTableRecordInfo().updateDictionary( dicts); - // } - public List<TblColRef> getColumns() { return this.getTableRecordInfo().getColumns(); } @@ -275,20 +240,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal return Objects.toStringHelper(this).add("uuid", uuid).add("create_time_utc:", createTimeUTC).add("name", name).add("last_build_job_id", lastBuildJobID).add("status", status).toString(); } - @Override - public int getColumnLength(TblColRef col) { - - int index = getTableRecordInfo().findColumn(col); - return getTableRecordInfo().getDigest().length(index); - } - - @Override - public Dictionary<?> getDictionary(TblColRef col) { - - int index = getTableRecordInfo().findColumn(col); - return getTableRecordInfo().dict(index); - } - public long getCreateTimeUTC() { return createTimeUTC; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java new file mode 100644 index 0000000..1c293d7 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.invertedindex.index; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.StreamingBatch; +import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.invertedindex.util.IIDictionaryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.List; + +/** + */ +public final class SliceBuilder { + + private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class); + + private final BatchSliceMaker sliceMaker; + private final IIDesc iiDesc; + private final boolean useLocalDict; + + public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) { + this.iiDesc = desc; + this.sliceMaker = new BatchSliceMaker(desc, shard); + this.useLocalDict = useLocalDict; + } + + public Slice buildSlice(StreamingBatch microStreamBatch) { + final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() { + @Nullable + @Override + public List<String> apply(@Nullable StreamingMessage input) { + return input.getData(); + } + }); + final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()]; + TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries); + return build(messages, tableRecordInfo, dictionaries); + } + + private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) { + final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() { + @Nullable + @Override + public TableRecord apply(@Nullable List<String> input) { + TableRecord result = tableRecordInfo.createTableRecord(); + for (int i = 0; i < input.size(); i++) { + result.setValueString(i, input.get(i)); + } + return result; + } + })); + slice.setLocalDictionaries(localDictionary); + return slice; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java index baafacd..8a0c2ba 100644 --- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java +++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java @@ -53,11 +53,6 @@ public class IIInstanceTest extends LocalFileMetadataTestCase { Assert.assertTrue(iiInstances.size() > 0); - IIInstance instance = iiInstances.get(0); - - Dictionary dict = mgr.getDictionary(instance.getFirstSegment(), instance.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_SITE_ID")); - - Assert.assertNotNull(dict); } @Test http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 1eb2683..b8d1333 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; @@ -78,6 +79,8 @@ public class HiveMRInput implements IMRInput { try { HCatInputFormat.setInput(job, dbName, tableName); job.setInputFormatClass(HCatInputFormat.class); + + job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index a1ab712..ee5a555 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -48,8 +48,8 @@ import kafka.message.MessageAndOffset; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.streaming.IStreamingInput; -import org.apache.kylin.engine.streaming.StreamingBatch; -import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.common.util.StreamingBatch; +import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.source.kafka.util.KafkaRequester; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index aace8bc..3455f1d 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -47,7 +47,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.streaming.StreamingManager; -import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.IntermediateColumnDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.config.KafkaConfig; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java index c0e506f..9691ea7 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java @@ -40,7 +40,7 @@ import java.util.List; import kafka.message.MessageAndOffset; -import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.collect.Lists; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index 4fae228..00f93a5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -43,7 +43,7 @@ import kafka.message.MessageAndOffset; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.TimeUtil; -import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java index de5e58e..0e29a0c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java @@ -40,7 +40,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.DaemonThreadFactory; import org.apache.kylin.common.util.OptionsHelper; -import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.StreamingParser; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java index 2833ea4..96b7fa7 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java @@ -11,7 +11,7 @@ import kafka.javaapi.PartitionMetadata; import kafka.message.MessageAndOffset; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.engine.streaming.StreamingMessage; +import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.source.kafka.StreamingParser; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java index e4b688f..0e0a8ce 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java @@ -19,6 +19,7 @@ package org.apache.kylin.storage.hbase.ii; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; @@ -43,13 +44,29 @@ public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, Imm @Override protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException { + ByteBuffer buffer = ByteBuffer.wrap(value.get(), value.getOffset(), value.getLength()); + int totalLength = value.getLength(); + int valueLength = buffer.getInt(); + int dictionaryLength = totalLength - valueLength - 4; KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), // IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, // IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, // timestamp, Type.Put, // - value.get(), value.getOffset(), value.getLength()); + buffer.array(), buffer.position(), valueLength); + // write value context.write(key, kv); + + kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), // + IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, // + IIDesc.HBASE_DICTIONARY_BYTES, 0, IIDesc.HBASE_DICTIONARY_BYTES.length, // + timestamp, Type.Put, // + buffer.array(), buffer.position() + valueLength, dictionaryLength); + + + // write dictionary + context.write(key, kv); + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java index 0a72a91..bcfe346 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java @@ -47,7 +47,7 @@ public class IICreateHTableJob extends AbstractHadoopJob { @Override public int run(String[] args) throws Exception { Options options = new Options(); - + HBaseAdmin admin = null; try { options.addOption(OPTION_II_NAME); options.addOption(OPTION_HTABLE_NAME); @@ -61,6 +61,22 @@ public class IICreateHTableJob extends AbstractHadoopJob { IIInstance ii = iiManager.getII(iiName); int sharding = ii.getDescriptor().getSharding(); + + Configuration conf = HBaseConfiguration.create(getConf()); + // check if the table already exists + admin = new HBaseAdmin(conf); + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + logger.info("Table " + tableName + " already exists and is enabled, no need to create."); + return 0; + } else { + logger.error("Table " + tableName + " is disabled, couldn't append data"); + return 1; + } + } + + // table doesn't exist, need to create + HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY); cf.setMaxVersions(1); @@ -100,7 +116,6 @@ public class IICreateHTableJob extends AbstractHadoopJob { tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis())); tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); - Configuration conf = HBaseConfiguration.create(getConf()); if (User.isHBaseSecurityEnabled(conf)) { // add coprocessor for bulk load tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); @@ -108,13 +123,7 @@ public class IICreateHTableJob extends AbstractHadoopJob { IIDeployCoprocessorCLI.deployCoprocessor(tableDesc); - // drop the table first - HBaseAdmin admin = new HBaseAdmin(conf); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - } - + // create table byte[][] splitKeys = getSplits(sharding); if (splitKeys.length == 0) @@ -126,12 +135,14 @@ public class IICreateHTableJob extends AbstractHadoopJob { } } System.out.println("create hbase table " + tableName + " done."); - admin.close(); return 0; } catch (Exception e) { printUsage(options); throw e; + } finally { + if (admin != null) + admin.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java index ff8b659..11c1711 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java @@ -54,7 +54,7 @@ public class HBaseMROutput implements IMROutput { @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { - steps.addCubingGarbageCollectionSteps(jobFlow); + steps.addInvertedIndexGarbageCollectionSteps(jobFlow); } }; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index 1267d2d..03db6b0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -184,7 +184,7 @@ public class HBaseMRSteps extends JobBuilderSupport { } public void addSaveIIToHTableSteps(DefaultChainedExecutable jobFlow, String rootPath) { - // create htable step + // create htable if it doesn't exist jobFlow.addTask(createCreateIIHTableStep(seg)); final String iiPath = rootPath + "*"; @@ -198,6 +198,22 @@ public class HBaseMRSteps extends JobBuilderSupport { } + public void addInvertedIndexGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { + String jobId = jobFlow.getId(); + + List<String> toDeletePaths = new ArrayList<>(); + toDeletePaths.add(getJobWorkingDir(jobId)); + + HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); + step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); + step.setDeletePaths(toDeletePaths); + step.setJobId(jobId); + + jobFlow.addTask(step); + } + + + private HadoopShellExecutable createCreateIIHTableStep(IRealizationSegment seg) { HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index b200c2e..15dc993 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -46,7 +46,6 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; @@ -108,7 +107,7 @@ public class DeployCoprocessorCLI { private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); + Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index 88cb7de..2137f57 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -35,8 +35,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java index 335e00c..6c6ed80 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java @@ -6,6 +6,7 @@ import java.util.Arrays; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.filter.ColumnTupleFilter; @@ -40,7 +41,10 @@ public class TsConditionEraserTest extends LocalFileMetadataTestCase { @Before public void setup() throws IOException { this.createTestMetadata(); - this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join"); + IIManager iiManager = IIManager.getInstance(getTestConfig()); + this.ii = iiManager.getII("test_kylin_ii_left_join"); + IISegment segment = iiManager.buildSegment(ii, 0, System.currentTimeMillis()); + ii.getSegments().add(segment); this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment()); this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT"); this.caldt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT"); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java index e271129..8b56605 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java @@ -18,20 +18,11 @@ package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; - +import com.google.common.collect.Lists; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.metadata.measure.LongMutable; import org.apache.kylin.metadata.measure.MeasureAggregator; @@ -45,7 +36,11 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.*; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; /** * @@ -90,6 +85,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase { @Test public void testSerializeAggregator() { final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join"); + if (ii.getFirstSegment() == null) { + IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis()); + ii.getSegments().add(segment); + } final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment()); final EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations()); byte[] x = EndpointAggregators.serialize(endpointAggregators); @@ -139,6 +138,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase { @Test public void basicTest() { final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join"); + if (ii.getFirstSegment() == null) { + IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis()); + ii.getSegments().add(segment); + } final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment()); final EndpointAggregators aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations()); final EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java index 791002f..3e34495 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; import org.junit.After; @@ -41,6 +42,10 @@ public class TableRecordInfoTest extends LocalFileMetadataTestCase { public void setup() throws IOException { this.createTestMetadata(); this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join"); + if (ii.getFirstSegment() == null) { + IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis()); + ii.getSegments().add(segment); + } this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment()); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/82bfa924/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java index 412e335..4e5a9d9 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.filter.ColumnTupleFilter; @@ -58,6 +59,10 @@ public class TsConditionExtractorTest extends LocalFileMetadataTestCase { public void setup() throws IOException { this.createTestMetadata(); this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join"); + if (ii.getFirstSegment() == null) { + IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis()); + ii.getSegments().add(segment); + } this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment()); this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT"); this.calDt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");