http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java deleted file mode 100644 index 7430c4e..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.basic; - -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.DoubleMutable; - -/** - */ -@SuppressWarnings("serial") -public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> { - - DoubleMutable min = null; - - @Override - public void reset() { - min = null; - } - - @Override - public void aggregate(DoubleMutable value) { - if (min == null) - min = new DoubleMutable(value.get()); - else if (min.get() > value.get()) - min.set(value.get()); - } - - @Override - public DoubleMutable getState() { - return min; - } - - @Override - public int getMemBytesEstimate() { - return guessDoubleMemBytes(); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java deleted file mode 100644 index 6e66c1b..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.basic; - -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.DoubleMutable; - -/** - */ -@SuppressWarnings("serial") -public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> { - - DoubleMutable sum = new DoubleMutable(); - - @Override - public void reset() { - sum.set(0.0); - } - - @Override - public void aggregate(DoubleMutable value) { - sum.set(sum.get() + value.get()); - } - - @Override - public DoubleMutable getState() { - return sum; - } - - @Override - public int getMemBytesEstimate() { - return guessDoubleMemBytes(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java deleted file mode 100644 index 7fdf3d8..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.basic; - -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.LongMutable; - -/** - */ -@SuppressWarnings("serial") -public class LongMaxAggregator extends MeasureAggregator<LongMutable> { - - LongMutable max = null; - - @Override - public void reset() { - max = null; - } - - @Override - public void aggregate(LongMutable value) { - if (max == null) - max = new LongMutable(value.get()); - else if (max.get() < value.get()) - max.set(value.get()); - } - - @Override - public LongMutable getState() { - return max; - } - - @Override - public int getMemBytesEstimate() { - return guessLongMemBytes(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java deleted file mode 100644 index 22ae865..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.basic; - -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.LongMutable; - -/** - */ -@SuppressWarnings("serial") -public class LongMinAggregator extends MeasureAggregator<LongMutable> { - - LongMutable min = null; - - @Override - public void reset() { - min = null; - } - - @Override - public void aggregate(LongMutable value) { - if (min == null) - min = new LongMutable(value.get()); - else if (min.get() > value.get()) - min.set(value.get()); - } - - @Override - public LongMutable getState() { - return min; - } - - @Override - public int getMemBytesEstimate() { - return guessLongMemBytes(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java deleted file mode 100644 index 38d728a..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.basic; - -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.LongMutable; - -/** - */ -@SuppressWarnings("serial") -public class LongSumAggregator extends MeasureAggregator<LongMutable> { - - LongMutable sum = new LongMutable(); - - @Override - public void reset() { - sum.set(0); - } - - @Override - public void aggregate(LongMutable value) { - sum.set(sum.get() + value.get()); - } - - @Override - public LongMutable getState() { - return sum; - } - - @Override - public int getMemBytesEstimate() { - return guessLongMemBytes(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java deleted file mode 100644 index d5ceba5..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.hllc; - -import java.util.List; - -import org.apache.kylin.aggregation.AggregationType; -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.DataType; -import org.apache.kylin.common.datatype.DataTypeSerializer; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; - -public class HLLCAggregation extends AggregationType { - - private final DataType dataType; - - public HLLCAggregation(String dataType) { - this.dataType = DataType.getType(dataType); - - if (this.dataType.getPrecision() < 10 || this.dataType.getPrecision() > 16) - throw new IllegalArgumentException("HLLC precision must be between 10 and 16"); - } - - @Override - public DataType getAggregationDataType() { - return dataType; - } - - @Override - public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() { - return HLLCSerializer.class; - } - - @Override - public void validate(MeasureDesc measureDesc) throws IllegalArgumentException { - // TODO Auto-generated method stub - - } - - @Override - public MeasureAggregator<?> newAggregator() { - if (dataType.isHLLC()) - return new HLLCAggregator(dataType.getPrecision()); - else - return new LDCAggregator(); - } - - @Override - public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) { - // TODO Auto-generated method stub - return null; - } - - @Override - public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) { - // TODO Auto-generated method stub - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java deleted file mode 100644 index 18c021d..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.hllc; - -import org.apache.kylin.aggregation.AggregationType; -import org.apache.kylin.aggregation.IAggregationFactory; -import org.apache.kylin.metadata.model.FunctionDesc; - -public class HLLCAggregationFactory implements IAggregationFactory { - - @Override - public AggregationType createAggregationType(String funcName, String dataType) { - if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName) == false) - throw new IllegalArgumentException(); - - return new HLLCAggregation(dataType); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java deleted file mode 100644 index 8f85fe8..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.hllc; - -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; - -/** - */ -@SuppressWarnings("serial") -public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> { - - final int precision; - HyperLogLogPlusCounter sum = null; - - public HLLCAggregator(int precision) { - this.precision = precision; - } - - @Override - public void reset() { - sum = null; - } - - @Override - public void aggregate(HyperLogLogPlusCounter value) { - if (sum == null) - sum = new HyperLogLogPlusCounter(value); - else - sum.merge(value); - } - - @Override - public HyperLogLogPlusCounter getState() { - return sum; - } - - @Override - public int getMemBytesEstimate() { - // 1024 + 60 returned by AggregationCacheMemSizeTest - return 8 // aggregator obj shell - + 4 // precision - + 8 // ref to HLLC - + 8 // HLLC obj shell - + 32 + (1 << precision); // HLLC internal - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java deleted file mode 100644 index 5612892..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.hllc; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.kylin.common.datatype.DataType; -import org.apache.kylin.common.datatype.DataTypeSerializer; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; - -/** - * @author yangli9 - * - */ -public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> { - - // be thread-safe and avoid repeated obj creation - private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>(); - - private int precision; - - public HLLCSerializer(DataType type) { - this.precision = type.getPrecision(); - } - - @Override - public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) { - try { - value.writeRegisters(out); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private HyperLogLogPlusCounter current() { - HyperLogLogPlusCounter hllc = current.get(); - if (hllc == null) { - hllc = new HyperLogLogPlusCounter(precision); - current.set(hllc); - } - return hllc; - } - - @Override - public HyperLogLogPlusCounter deserialize(ByteBuffer in) { - HyperLogLogPlusCounter hllc = current(); - try { - hllc.readRegisters(in); - } catch (IOException e) { - throw new RuntimeException(e); - } - return hllc; - } - - @Override - public int peekLength(ByteBuffer in) { - return current().peekLength(in); - } - - @Override - public int maxLength() { - return current().maxLength(); - } - - @Override - public int getStorageBytesEstimate() { - return current().maxLength(); - } - - @Override - public HyperLogLogPlusCounter valueOf(byte[] value) { - HyperLogLogPlusCounter hllc = current(); - hllc.clear(); - if (value == null) - hllc.add("__nUlL__"); - else - hllc.add(value); - return hllc; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java deleted file mode 100644 index 151c1ee..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.hllc; - -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.LongMutable; - -/** - * Long Distinct Count - */ -@SuppressWarnings("serial") -public class LDCAggregator extends MeasureAggregator<LongMutable> { - - private static LongMutable ZERO = new LongMutable(0); - - private HLLCAggregator hllAgg = null; - private LongMutable state = new LongMutable(0); - - @SuppressWarnings("rawtypes") - public void setDependentAggregator(MeasureAggregator agg) { - this.hllAgg = (HLLCAggregator) agg; - } - - @Override - public void reset() { - } - - @Override - public void aggregate(LongMutable value) { - } - - @Override - public LongMutable getState() { - if (hllAgg == null) { - return ZERO; - } else { - state.set(hllAgg.getState().getCountEstimate()); - return state; - } - } - - @Override - public int getMemBytesEstimate() { - return guessLongMemBytes(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java deleted file mode 100644 index 251abd9..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.topn; - -import java.util.List; - -import org.apache.kylin.aggregation.AggregationType; -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.aggregation.hllc.HLLCSerializer; -import org.apache.kylin.common.datatype.DataType; -import org.apache.kylin.common.datatype.DataTypeSerializer; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; - -public class TopNAggregation extends AggregationType { - - private final DataType dataType; - - public TopNAggregation(String dataType) { - this.dataType = DataType.getType(dataType); - - if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 1000) - throw new IllegalArgumentException("TopN precision must be between 1 and 1000"); - } - - @Override - public DataType getAggregationDataType() { - return dataType; - } - - @Override - public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() { - return HLLCSerializer.class; - } - - @Override - public void validate(MeasureDesc measureDesc) throws IllegalArgumentException { - // TODO Auto-generated method stub - - } - - @Override - public MeasureAggregator<?> newAggregator() { - return new TopNAggregator(); - } - - @Override - public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) { - // TODO Auto-generated method stub - return null; - } - - @Override - public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) { - // TODO Auto-generated method stub - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java deleted file mode 100644 index 1ea22c8..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.topn; - -import org.apache.kylin.aggregation.AggregationType; -import org.apache.kylin.aggregation.IAggregationFactory; -import org.apache.kylin.metadata.model.FunctionDesc; - -public class TopNAggregationFactory implements IAggregationFactory { - - @Override - public AggregationType createAggregationType(String funcName, String dataType) { - if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName) == false) - throw new IllegalArgumentException(); - - return new TopNAggregation(dataType); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java deleted file mode 100644 index 4f6c7ee..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.topn; - -import java.util.Map; - -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.topn.TopNCounter; -import org.apache.kylin.common.util.ByteArray; - -import com.google.common.collect.Maps; - -/** - * - */ -@SuppressWarnings("serial") -public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> { - - int capacity = 0; - TopNCounter<ByteArray> sum = null; - Map<ByteArray, Double> sanityCheckMap; - - @Override - public void reset() { - sum = null; - } - - @Override - public void aggregate(TopNCounter<ByteArray> value) { - if (sum == null) { - capacity = value.getCapacity(); - sum = new TopNCounter<>(capacity); - sanityCheckMap = Maps.newHashMap(); - } - sum.merge(value); - } - - @Override - public TopNCounter<ByteArray> getState() { - - //sum.retain(capacity); - return sum; - } - - @Override - public int getMemBytesEstimate() { - return 8 * capacity / 4; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java deleted file mode 100644 index 8088842..0000000 --- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.aggregation.topn; - -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; - -import org.apache.kylin.common.datatype.DataType; -import org.apache.kylin.common.datatype.DataTypeSerializer; -import org.apache.kylin.common.topn.Counter; -import org.apache.kylin.common.topn.DoubleDeltaSerializer; -import org.apache.kylin.common.topn.TopNCounter; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.BytesUtil; - -/** - * - */ -public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> { - - private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3); - - private int precision; - - public TopNCounterSerializer(DataType dataType) { - this.precision = dataType.getPrecision(); - } - - @Override - public int peekLength(ByteBuffer in) { - int mark = in.position(); - @SuppressWarnings("unused") - int capacity = in.getInt(); - int size = in.getInt(); - int keyLength = in.getInt(); - dds.deserialize(in); - int len = in.position() - mark + keyLength * size; - in.position(mark); - return len; - } - - @Override - public int maxLength() { - return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8); - } - - @Override - public int getStorageBytesEstimate() { - return precision * TopNCounter.EXTRA_SPACE_RATE * 8; - } - - @Override - public TopNCounter<ByteArray> valueOf(byte[] value) { - ByteBuffer buffer = ByteBuffer.wrap(value); - int sizeOfId = buffer.getInt(); - int keyEncodedValue = buffer.getInt(); - double counter = buffer.getDouble(); - - ByteArray key = new ByteArray(sizeOfId); - BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, sizeOfId); - - TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(precision * TopNCounter.EXTRA_SPACE_RATE); - topNCounter.offer(key, counter); - return topNCounter; - } - - @Override - public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) { - double[] counters = value.getCounters(); - List<ByteArray> peek = value.peek(1); - int keyLength = peek.size() > 0 ? peek.get(0).length() : 0; - out.putInt(value.getCapacity()); - out.putInt(value.size()); - out.putInt(keyLength); - dds.serialize(counters, out); - Iterator<Counter<ByteArray>> iterator = value.iterator(); - while (iterator.hasNext()) { - out.put(iterator.next().getItem().array()); - } - } - - @Override - public TopNCounter<ByteArray> deserialize(ByteBuffer in) { - int capacity = in.getInt(); - int size = in.getInt(); - int keyLength = in.getInt(); - double[] counters = dds.deserialize(in); - - TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity); - ByteArray byteArray; - for (int i = 0; i < size; i++) { - byteArray = new ByteArray(keyLength); - in.get(byteArray.array()); - counter.offerToHead(byteArray, counters[i]); - } - - return counter; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 4592b15..d909a4c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -38,10 +38,10 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.CaseInsensitiveStringCache; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.DimensionDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.dict.DistinctColumnValuesProvider; @@ -182,7 +182,8 @@ public class CubeManager implements IRealizationProvider { /** * return null if no dictionary for given column */ - public Dictionary<?> getDictionary(CubeSegment cubeSeg, TblColRef col) { + @SuppressWarnings("unchecked") + public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) { DictionaryInfo info = null; try { DictionaryManager dictMgr = getDictionaryManager(); @@ -199,7 +200,7 @@ public class CubeManager implements IRealizationProvider { throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, e); } - return info.getDictionaryObject(); + return (Dictionary<String>) info.getDictionaryObject(); } public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 62df1e9..b29f83a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -25,17 +25,17 @@ import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.IDictionaryAware; -import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.metadata.realization.IRealizationSegment; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; @@ -43,8 +43,6 @@ import com.fasterxml.jackson.annotation.JsonBackReference; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Objects; import com.google.common.collect.Maps; -import org.apache.kylin.metadata.realization.IRealization; -import org.apache.kylin.metadata.realization.IRealizationSegment; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IRealizationSegment { @@ -267,6 +265,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I public void setStorageLocationIdentifier(String storageLocationIdentifier) { this.storageLocationIdentifier = storageLocationIdentifier; } + + public Map<TblColRef, Dictionary<String>> buildDictionaryMap() { + Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); + for (TblColRef col : getCubeDesc().getAllColumnsNeedDictionary()) { + result.put(col, (Dictionary<String>) getDictionary(col)); + } + return result; + } @Override public int getColumnLength(TblColRef col) { @@ -279,7 +285,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I } @Override - public Dictionary<?> getDictionary(TblColRef col) { + public Dictionary<String> getDictionary(TblColRef col) { return CubeManager.getInstance(this.getCubeInstance().getConfig()).getDictionary(this, col); } @@ -427,4 +433,5 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I public IJoinedFlatTableDesc getJoinedFlatTableDesc() { return new CubeJoinedFlatTableDesc(this.getCubeDesc(), this); } + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java index 3619d69..7f38c26 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java @@ -1,23 +1,21 @@ package org.apache.kylin.cube.gridtable; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.Map; -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.DataTypeSerializer; -import org.apache.kylin.common.datatype.StringSerializer; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.DefaultGTComparator; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.IGTCodeSystem; import org.apache.kylin.gridtable.IGTComparator; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; /** * defines how column values will be encoded to/ decoded from GTRecord @@ -111,9 +109,6 @@ public class CubeCodeSystem implements IGTCodeSystem { if (serializer instanceof DictionarySerializer) { ((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf); } else { - if ((value instanceof String) && (!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer))) { - value = serializer.valueOf((String) value); - } serializer.serialize(value, buf); } } @@ -176,11 +171,6 @@ public class CubeCodeSystem implements IGTCodeSystem { } @Override - public Object valueOf(byte[] value) { - throw new UnsupportedOperationException(); - } - - @Override public void serialize(Object value, ByteBuffer out) { throw new UnsupportedOperationException(); } @@ -230,10 +220,6 @@ public class CubeCodeSystem implements IGTCodeSystem { return dictionary.getSizeOfId(); } - @Override - public Object valueOf(byte[] value) { - throw new UnsupportedOperationException(); - } } static class FixLenSerializer extends DataTypeSerializer { @@ -306,16 +292,6 @@ public class CubeCodeSystem implements IGTCodeSystem { return fixLen; } - @Override - public Object valueOf(byte[] value) { - try { - return new String(value, "UTF-8"); - } catch (UnsupportedEncodingException e) { - // does not happen - throw new RuntimeException(e); - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java index aa0a530..05fc8a5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java @@ -3,11 +3,11 @@ package org.apache.kylin.cube.gridtable; import java.util.List; import java.util.Map; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.metadata.model.TblColRef; @@ -16,24 +16,25 @@ import com.google.common.collect.Maps; @SuppressWarnings("rawtypes") public class CubeGridTable { - public static Map<TblColRef, Dictionary<?>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) { + public static Map<TblColRef, Dictionary<String>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) { CubeDesc cubeDesc = cubeSeg.getCubeDesc(); CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig()); // build a dictionary map - Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap(); + Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap(); List<TblColRef> dimCols = Cuboid.findById(cubeDesc, cuboidId).getColumns(); for (TblColRef col : dimCols) { - Dictionary<?> dictionary = cubeMgr.getDictionary(cubeSeg, col); + Dictionary<String> dictionary = cubeMgr.getDictionary(cubeSeg, col); if (dictionary != null) { dictionaryMap.put(col, dictionary); } } + return dictionaryMap; } public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) throws NotEnoughGTInfoException { - Map<TblColRef, Dictionary<?>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId); + Map<TblColRef, Dictionary<String>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId); Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId); for (TblColRef dim : cuboid.getColumns()) { if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(dim)) { @@ -47,7 +48,7 @@ public class CubeGridTable { return newGTInfo(cubeSeg.getCubeDesc(), cuboidId, dictionaryMap); } - public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<?>> dictionaryMap) { + public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<String>> dictionaryMap) { Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId); CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java index 36db773..2152301 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java @@ -6,11 +6,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.kylin.common.datatype.DataType; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.HBaseColumnDesc; import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java index 045b11e..9bbcf75 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java @@ -18,5 +18,6 @@ package org.apache.kylin.cube.gridtable; +@SuppressWarnings("serial") public class NotEnoughGTInfoException extends Exception { } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java index d30186e..c4d0a7e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java @@ -23,12 +23,12 @@ package org.apache.kylin.cube.gridtable; import java.nio.ByteBuffer; import java.util.Map; -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.DataTypeSerializer; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.IGTCodeSystem; import org.apache.kylin.gridtable.IGTComparator; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; @SuppressWarnings({ "rawtypes", "unchecked" }) public class TrimmedCubeCodeSystem implements IGTCodeSystem { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java index 0a35559..58f94c1 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java @@ -21,8 +21,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GridTable; @@ -39,12 +39,12 @@ abstract public class AbstractInMemCubeBuilder { private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class); final protected CubeDesc cubeDesc; - final protected Map<TblColRef, Dictionary<?>> dictionaryMap; + final protected Map<TblColRef, Dictionary<String>> dictionaryMap; protected int taskThreadCount = 4; protected int reserveMemoryMB = 200; - public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) { + public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { if (cubeDesc == null) throw new NullPointerException(); if (dictionaryMap == null) http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java index ce912a3..5b6131f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java @@ -29,15 +29,15 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.TimeUnit; -import org.apache.kylin.aggregation.MeasureAggregators; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +54,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { private int splitRowThreshold = Integer.MAX_VALUE; private int unitRows = 1000; - public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) { + public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { super(cubeDesc, dictionaryMap); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index a393179..5c59de7 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -28,9 +28,9 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.kylin.common.datatype.DoubleMutable; import org.apache.kylin.common.topn.Counter; import org.apache.kylin.common.topn.TopNCounter; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.Pair; @@ -38,8 +38,6 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.gridtable.CubeGridTable; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.GTAggregateScanner; import org.apache.kylin.gridtable.GTBuilder; import org.apache.kylin.gridtable.GTInfo; @@ -47,6 +45,7 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -66,7 +65,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private final CuboidScheduler cuboidScheduler; private final long baseCuboidId; private final int totalCuboidCount; - private final CubeJoinedFlatTableDesc intermediateTableDesc; private final String[] metricsAggrFuncs; private final MeasureDesc[] measureDescs; private final int measureCount; @@ -81,12 +79,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private Object[] totalSumForSanityCheck; private ICuboidCollector resultCollector; - public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) { + public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { super(cubeDesc, dictionaryMap); this.cuboidScheduler = new CuboidScheduler(cubeDesc); this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); this.totalCuboidCount = cuboidScheduler.getCuboidCount(); - this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null); this.measureCount = cubeDesc.getMeasures().size(); this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); @@ -100,8 +97,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]); } - - private GridTable newGridTableByCuboidID(long cuboidID) throws IOException { GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap); @@ -114,7 +109,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { return gridTable; } - @Override public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException { ConcurrentNavigableMap<Long, CuboidResult> result = build(input); @@ -447,7 +441,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0); } - //@SuppressWarnings("unused") + @SuppressWarnings({ "unused", "rawtypes", "unchecked" }) private void sanityCheck(long parentId, long cuboidId, Object[] totalSum) { // double sum introduces error and causes result not exactly equal for (int i = 0; i < totalSum.length; i++) { @@ -508,9 +502,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { this.info = info; this.input = input; this.record = new GTRecord(info); - this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, - InMemCubeBuilderUtils.createTopNLiteralColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap), - info); + this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, dictionaryMap, info); } @Override http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java index bf4278a..fed9479 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java @@ -17,49 +17,39 @@ */ package org.apache.kylin.cube.inmemcubing; -import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; -import org.apache.kylin.aggregation.MeasureCodec; -import org.apache.kylin.common.datatype.LongMutable; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import org.apache.kylin.metadata.model.ParameterDesc; +import org.apache.kylin.metadata.model.TblColRef; /** */ public class InMemCubeBuilderInputConverter { - private static final LongMutable ONE = new LongMutable(1l); - - private final CubeDesc cubeDesc; private final CubeJoinedFlatTableDesc intermediateTableDesc; private final MeasureDesc[] measureDescs; - private final MeasureCodec measureCodec; + private final MeasureIngester<?>[] measureIngesters; private final int measureCount; - private final ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - private final Map<Integer, Dictionary<String>> topNLiteralColDictMap; + private final Map<TblColRef, Dictionary<String>> dictionaryMap; private final GTInfo gtInfo; - public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNLiteralColDictMap, GTInfo gtInfo) { - this.cubeDesc = cubeDesc; + public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) { this.gtInfo = gtInfo; this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null); this.measureCount = cubeDesc.getMeasures().size(); this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); - this.measureCodec = new MeasureCodec(cubeDesc.getMeasures()); - this.topNLiteralColDictMap = Preconditions.checkNotNull(topNLiteralColDictMap, "topNLiteralColDictMap cannot be null"); + this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures()); + this.dictionaryMap = dictionaryMap; } public final GTRecord convert(List<String> row) { @@ -89,59 +79,38 @@ public class InMemCubeBuilderInputConverter { } private Object[] buildValue(List<String> row) { - Object[] values = new Object[measureCount]; for (int i = 0; i < measureCount; i++) { - MeasureDesc measureDesc = measureDescs[i]; - int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i]; - FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction(); - if (flatTableIdx == null) { - values[i] = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue()); - } else if (function.isCount() || function.isHolisticCountDistinct()) { + values[i] = buildValueOf(i, row); + } + return values; + } + + private Object buildValueOf(int idxOfMeasure, List<String> row) { + MeasureDesc measure = measureDescs[idxOfMeasure]; + FunctionDesc function = measure.getFunction(); + int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; + + int paramCount = function.getParameterCount(); + String[] inputToMeasure = new String[paramCount]; + + // pick up parameter values + ParameterDesc param = function.getParameter(); + int paramColIdx = 0; // index among parameters of column type + for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { + String value; + if (function.isCount() || function.isHolisticCountDistinct()) { // note for holistic count distinct, this value will be ignored - values[i] = ONE; - } else if (function.isTopN()) { - // encode the key column with dict, and get the counter column; - int keyColIndex = flatTableIdx[flatTableIdx.length - 1]; - Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex); - int keyColEncoded = literalColDict.getIdFromValue(row.get(keyColIndex)); - valueBuf.clear(); - valueBuf.putInt(literalColDict.getSizeOfId()); - valueBuf.putInt(keyColEncoded); - if (flatTableIdx.length == 1) { - // only literalCol, use 1.0 as counter - valueBuf.putDouble(1.0); - } else { - // get the counter column value - valueBuf.putDouble(Double.valueOf(row.get(flatTableIdx[0]))); - } - - values[i] = measureCodec.getSerializer(i).valueOf(valueBuf.array()); - - } else if (flatTableIdx.length == 1) { - values[i] = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0]))); + value = "1"; + } else if (param.isColumnType()) { + value = row.get(colIdxOnFlatTable[paramColIdx++]); } else { - - byte[] result = null; - for (int x = 0; x < flatTableIdx.length; x++) { - byte[] split = toBytes(row.get(flatTableIdx[x])); - if (result == null) { - result = Arrays.copyOf(split, split.length); - } else { - byte[] newResult = new byte[result.length + split.length]; - System.arraycopy(result, 0, newResult, 0, result.length); - System.arraycopy(split, 0, newResult, result.length, split.length); - result = newResult; - } - } - values[i] = measureCodec.getSerializer(i).valueOf(result); + value = param.getValue(); } + inputToMeasure[i] = value; } - return values; - } - - private byte[] toBytes(String v) { - return v == null ? null : Bytes.toBytes(v); + + return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java index 9d819a4..e8fa6d0 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java @@ -18,11 +18,12 @@ package org.apache.kylin.cube.inmemcubing; import com.google.common.collect.Maps; + +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java index 4316376..62432f7 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java @@ -21,10 +21,10 @@ package org.apache.kylin.cube.kv; import java.util.Map; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java index 7fedd90..ba15b48 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java @@ -22,7 +22,7 @@ import java.util.Arrays; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.dict.IDictionaryAware; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java index a21fe9f..fea3736 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java @@ -21,7 +21,7 @@ package org.apache.kylin.cube.kv; import java.util.Collection; import java.util.Comparator; -import org.apache.kylin.common.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataType; /** * @author yangli9 http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index ef563ed..3e8ee13 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -41,6 +41,7 @@ import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.CaseInsensitiveStringMap; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; @@ -818,7 +819,8 @@ public class CubeDesc extends RootPersistentEntity { } for (MeasureDesc measure : measures) { - result.addAll(measure.getColumnsNeedDictionary()); + MeasureType aggrType = MeasureType.create(measure.getFunction()); + result.addAll(aggrType.getColumnsNeedDictionary(measure)); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java index 8e36009..1121621 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java @@ -26,12 +26,12 @@ import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.datatype.DataType; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.validation.IValidatorRule; import org.apache.kylin.cube.model.validation.ResultLevel; import org.apache.kylin.cube.model.validation.ValidateContext; import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java index 0cfd020..3e79226 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java @@ -50,9 +50,11 @@ import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.hll.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; @@ -146,7 +148,7 @@ public class CubingUtils { return result; } - public static Map<TblColRef, Dictionary<?>> buildDictionary(final CubeInstance cubeInstance, Iterable<List<String>> recordList) throws IOException { + public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance, Iterable<List<String>> recordList) throws IOException { final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor().listDimensionColumnsExcludingDerived(); final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap(); int index = 0; @@ -154,7 +156,7 @@ public class CubingUtils { tblColRefMap.put(index++, column); } - HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap(); + HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap(); HashMultimap<TblColRef, String> valueMap = HashMultimap.create(); for (List<String> row : recordList) { @@ -173,18 +175,19 @@ public class CubingUtils { return input == null ? null : input.getBytes(); } }); - final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes)); + final Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes)); result.put(tblColRef, dict); } return result; } - public static Map<TblColRef, Dictionary<?>> writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) { - Map<TblColRef, Dictionary<?>> realDictMap = Maps.newHashMap(); + @SuppressWarnings("unchecked") + public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) { + Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap(); - for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) { + for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) { final TblColRef tblColRef = entry.getKey(); - final Dictionary<?> dictionary = entry.getValue(); + final Dictionary<String> dictionary = entry.getValue(); ReadableTable.TableSignature signature = new ReadableTable.TableSignature(); signature.setLastModifiedTime(System.currentTimeMillis()); signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset)); @@ -195,7 +198,7 @@ public class CubingUtils { try { DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo); cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath()); - realDictMap.put(tblColRef, realDict.getDictionaryObject()); + realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject()); } catch (IOException e) { logger.error("error save dictionary for column:" + tblColRef, e); throw new RuntimeException("error save dictionary for column:" + tblColRef, e); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java b/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java index f7623e3..714571f 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java @@ -1,7 +1,7 @@ package org.apache.kylin.gridtable; import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.common.util.Dictionary; public class DefaultGTComparator implements IGTComparator { @Override http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 01696e8..eb8d212 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -7,10 +7,10 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.SortedMap; -import org.apache.kylin.aggregation.MeasureAggregator; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.MemoryBudgetController; +import org.apache.kylin.measure.MeasureAggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java index 229c679..d3a03d1 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java @@ -5,10 +5,10 @@ import java.util.BitSet; import java.util.Iterator; import java.util.LinkedList; -import org.apache.kylin.common.datatype.DataType; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.gridtable.CubeCodeSystem; import org.apache.kylin.cube.util.KryoUtils; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; public class GTInfo { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java index 2783f55..b3133be 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java @@ -2,9 +2,9 @@ package org.apache.kylin.gridtable; import java.nio.ByteBuffer; -import org.apache.kylin.aggregation.MeasureAggregator; -import org.apache.kylin.common.datatype.DataTypeSerializer; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; @SuppressWarnings({ "rawtypes", "unchecked" }) /** http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java index 0e61cf2..3a22091 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java @@ -2,8 +2,8 @@ package org.apache.kylin.gridtable; import java.nio.ByteBuffer; -import org.apache.kylin.aggregation.MeasureAggregator; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.measure.MeasureAggregator; public interface IGTCodeSystem { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java index f8d7f30..ff71b4f 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java @@ -22,11 +22,11 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.List; -import org.apache.kylin.common.datatype.DataType; -import org.apache.kylin.common.datatype.LongMutable; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.gridtable.GTInfo.Builder; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.LongMutable; public class UnitTestSupport { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java deleted file mode 100644 index 8ae44b6..0000000 --- a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java +++ /dev/null @@ -1,60 +0,0 @@ -package org.apache.kylin.aggregation.topn; - -import java.nio.ByteBuffer; - -import org.apache.kylin.common.datatype.DataType; -import org.apache.kylin.common.topn.TopNCounter; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.junit.Assert; -import org.junit.Test; - -/** - * - */ -public class TopNCounterSerializerTest { - - private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getType("topn(10)")); - - @Test - public void testSerialization() { - TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50); - Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 }; - for (Integer i : stream) { - vs.offer(new ByteArray(Bytes.toBytes(i))); - } - - ByteBuffer out = ByteBuffer.allocate(1024); - serializer.serialize(vs, out); - - byte[] copyBytes = new byte[out.position()]; - System.arraycopy(out.array(), 0, copyBytes, 0, out.position()); - - ByteBuffer in = ByteBuffer.wrap(copyBytes); - TopNCounter<ByteArray> vsNew = serializer.deserialize(in); - - Assert.assertEquals(vs.toString(), vsNew.toString()); - - } - - @Test - public void testValueOf() { - - TopNCounter<ByteArray> origin = new TopNCounter<ByteArray>(10); - ByteArray key = new ByteArray(1); - ByteBuffer byteBuffer = key.asBuffer(); - BytesUtil.writeVLong(20l, byteBuffer); - origin.offer(key, 1.0); - - byteBuffer = ByteBuffer.allocate(1024); - byteBuffer.putInt(1); - byteBuffer.putInt(20); - byteBuffer.putDouble(1.0); - TopNCounter<ByteArray> counter = serializer.valueOf(byteBuffer.array()); - - - Assert.assertEquals(origin.toString(), counter.toString()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java index d7feb56..bce228d 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java @@ -23,10 +23,10 @@ import static org.junit.Assert.assertTrue; import java.util.HashSet; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.metadata.model.TblColRef; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java index 935e840..c25bad7 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java @@ -26,10 +26,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.dict.Dictionary; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.TblColRef; import org.junit.AfterClass; @@ -51,7 +51,7 @@ public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase { private static CubeInstance cube; private static String flatTable; - private static Map<TblColRef, Dictionary<?>> dictionaryMap; + private static Map<TblColRef, Dictionary<String>> dictionaryMap; @BeforeClass public static void before() throws IOException {