This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 60ad2aa86de528209b898c2d2bb343bcec1c3d7e Author: Bowen Song <bowen.s...@kyligence.io> AuthorDate: Thu Nov 10 11:23:54 2022 +0800 KYLIN-5397 Support sum_lc function * Support sum_lc function * Remove wrap sum_lc type * Remove rewrite logic, and unnecessary check * fix part ut --- .../kylin/rest/service/AccessServiceTest.java | 9 +- .../kylin/rest/service/AclTCRServiceTest.java | 4 +- .../java/org/apache/kylin/common/msg/Message.java | 2 +- .../org/apache/kylin/common/util/DateFormat.java | 29 ++- .../apache/kylin/common/util/DateFormatTest.java | 51 ++++ .../apache/kylin/measure/MeasureTypeFactory.java | 2 + .../apache/kylin/measure/sumlc/SumLCCounter.java | 96 ++++++++ .../kylin/measure/sumlc/SumLCMeasureType.java | 172 +++++++++++++ .../apache/kylin/metadata/datatype/DataType.java | 10 +- .../apache/kylin/metadata/model/FunctionDesc.java | 68 ++++-- .../metadata/model/util/FunctionDescTest.java | 12 +- .../metadata/project/NProjectManagerTest.java | 5 +- .../localmeta/metadata/_global/project/sum_lc.json | 35 +++ .../f35f2937-9e4d-347a-7465-d64df939e7d6.json | 13 + .../f35f2937-9e4d-347a-7465-d64df939e7d6.json | 43 ++++ .../f35f2937-9e4d-347a-7465-d64df939e7d6.json | 267 +++++++++++++++++++++ .../metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json | 78 ++++++ .../kylin/rest/service/ProjectServiceTest.java | 8 +- .../kylin/query/relnode/KapAggregateRel.java | 21 +- .../kylin/query/relnode/OLAPAggregateRel.java | 2 +- .../rest/service/QueryHistoryServiceTest.java | 2 +- .../kylin/query/engine/QueryRoutingEngine.java | 15 ++ .../kylin/query/engine/AsyncQueryJobTest.java | 6 +- .../kylin/query/engine/QueryRoutingEngineTest.java | 16 ++ .../kylin/engine/spark/job/CuboidAggregator.scala | 41 ++-- .../kylin/query/runtime/plan/AggregatePlan.scala | 11 +- .../scala/org/apache/spark/sql/KapFunctions.scala | 14 +- .../sql/catalyst/expressions/ExpressionUtils.scala | 31 ++- .../sql/catalyst/expressions/KapExpresssions.scala | 64 ++++- .../org/apache/spark/sql/udf/SparderAggFun.scala | 11 +- .../apache/spark/sql/LayoutEntityConverter.scala | 6 +- .../spark/sql/udaf/NullSafeValueSerializer.scala | 30 ++- .../scala/org/apache/spark/sql/udaf/SumLC.scala | 191 +++++++++++++++ 33 files changed, 1256 insertions(+), 109 deletions(-) diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java index fb102f63de..b0f9142756 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java @@ -48,6 +48,7 @@ import org.apache.kylin.common.persistence.AclEntity; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.user.ManagedUser; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.AccessRequest; import org.apache.kylin.rest.request.GlobalAccessRequest; @@ -98,8 +99,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.kylin.metadata.user.ManagedUser; - @RunWith(PowerMockRunner.class) @PrepareForTest({ SpringContext.class, UserGroupInformation.class, KylinConfig.class, NProjectManager.class }) public class AccessServiceTest extends NLocalFileMetadataTestCase { @@ -597,14 +596,14 @@ public class AccessServiceTest extends NLocalFileMetadataTestCase { @Test public void testGetGrantedProjectsOfUser() throws IOException { List<String> result = accessService.getGrantedProjectsOfUser("ADMIN"); - assertEquals(27, result.size()); + assertEquals(28, result.size()); } @Test public void testGetGrantedProjectsOfUserOrGroup() throws IOException { // admin user List<String> result = accessService.getGrantedProjectsOfUserOrGroup("ADMIN", true); - assertEquals(27, result.size()); + assertEquals(28, result.size()); // normal user result = accessService.getGrantedProjectsOfUserOrGroup("ANALYST", true); @@ -786,7 +785,7 @@ public class AccessServiceTest extends NLocalFileMetadataTestCase { } @Test - public void testAclWithUnNaturalOrderUpdate() throws IOException{ + public void testAclWithUnNaturalOrderUpdate() throws IOException { AclEntity ae = accessService.getAclEntity(AclEntityType.PROJECT_INSTANCE, "1eaca32a-a33e-4b69-83dd-0bb8b1f8c91b"); diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java index bb994174a9..9a0ee62d7d 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/AclTCRServiceTest.java @@ -311,7 +311,7 @@ public class AclTCRServiceTest extends NLocalFileMetadataTestCase { } private SensitiveDataMask.MaskType getColumnDataMask(AclTCRRequest acl, String database, String table, - String column) { + String column) { if (acl.getDatabaseName().equals(database)) { for (val tb : acl.getTables()) { if (tb.getTableName().equals(table)) { @@ -1316,7 +1316,7 @@ public class AclTCRServiceTest extends NLocalFileMetadataTestCase { Mockito.when(userService.isGlobalAdmin("ADMIN")).thenReturn(true); List<SidPermissionWithAclResponse> responses = accessService.getUserOrGroupAclPermissions(projects, "ADMIN", true); - Assert.assertEquals(27, responses.size()); + Assert.assertEquals(28, responses.size()); Assert.assertTrue(responses.stream().allMatch(response -> "ADMIN".equals(response.getProjectPermission()))); // test normal group diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java index 365474db28..2048ce9bc1 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java @@ -1015,7 +1015,7 @@ public class Message { } public String getInvalidTimeFormat() { - return "Can’t set the time partition column. The values of the selected column is not time formatted. Please select again."; + return "Can’t set the time partition column. The values of the selected column is not time formatted: {%s}. Please select again."; } public String getSegmentMergeStorageCheckError() { diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java index e5391e3b16..fd8dd243fa 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java @@ -64,6 +64,10 @@ public class DateFormat { public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS"; public static final String DEFAULT_DATETIME_PATTERN_WITH_TIMEZONE = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private static final int NANOS_TIMESTAMP_LENGTH = 16; + private static final int MILLIS_TIMESTAMP_LENGTH = 13; + private static final int SECONDS_TIMESTAMP_LENGTH = 10; + static final private Map<String, FastDateFormat> formatMap = new ConcurrentHashMap<String, FastDateFormat>(); private static final Map<String, String> dateFormatRegex = Maps.newHashMap(); @@ -210,13 +214,26 @@ public class DateFormat { return stringToDate(str, regexToPattern.getValue()).getTime(); } - // try parse it as days to epoch try { - long daysToEpoch = Long.parseLong(str); - return daysToEpoch * 24 * 60 * 60 * 1000; + long strToDigit = Long.parseLong(str); + if (strToDigit > 0) { + if (str.length() == NANOS_TIMESTAMP_LENGTH) { + return strToDigit / 1000; + } else if (str.length() == MILLIS_TIMESTAMP_LENGTH) { + return strToDigit; + } else if (str.length() == SECONDS_TIMESTAMP_LENGTH) { + return strToDigit * 1000; + } else { + // try parse it as days to epoch + return strToDigit * 24 * 60 * 60 * 1000; + } + } } catch (NumberFormatException e) { + throw new KylinException(INVALID_TIME_PARTITION_COLUMN, + String.format(Locale.ROOT, MsgPicker.getMsg().getInvalidTimeFormat(), str), e); } - throw new KylinException(INVALID_TIME_PARTITION_COLUMN, MsgPicker.getMsg().getInvalidTimeFormat()); + throw new KylinException(INVALID_TIME_PARTITION_COLUMN, + String.format(Locale.ROOT, MsgPicker.getMsg().getInvalidTimeFormat(), str)); } public static boolean isSupportedDateFormat(String dateStr) { @@ -244,7 +261,8 @@ public class DateFormat { if (sampleData.matches(patternMap.getKey())) return patternMap.getValue(); } - throw new KylinException(INVALID_TIME_PARTITION_COLUMN, MsgPicker.getMsg().getInvalidTimeFormat()); + throw new KylinException(INVALID_TIME_PARTITION_COLUMN, + String.format(Locale.ROOT, MsgPicker.getMsg().getInvalidTimeFormat(), sampleData)); } /** @@ -291,4 +309,5 @@ public class DateFormat { || DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS.equals(format) || DEFAULT_DATETIME_PATTERN_WITH_TIMEZONE.equals(format)); } + } diff --git a/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java b/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java index 9f5364e8cc..0387906235 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/util/DateFormatTest.java @@ -24,12 +24,14 @@ import java.time.format.DateTimeParseException; import java.util.Arrays; import java.util.Date; +import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.junit.annotation.MultiTimezoneTest; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.junit.Assert; +import org.junit.Test; /** * Created by dongli on 1/4/16. @@ -189,4 +191,53 @@ public class DateFormatTest { } } + + @Test + public void testStringToMillis() { + // 2022-12-01 00:00:00 + long expectedMillis = 1669824000000L; + + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("202212")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00:00")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00:00:000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00:00:000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00:00.000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00:00:000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00:00:000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00:00")); + } + + @Test + public void testStringToMillisSupplement() { + long expectedMillis = 1669824000000L; + + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00:00")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201T00:00:00.000Z")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01T00:00:00.000Z")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01 00:00:00")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("20221201 00:00:00.000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01T00:00:00.000Z")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01T00:00:00.000Z")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022.12.01 00:00:00.000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022-12-01T00:00:00.000+08:00")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("2022/12/01 00:00:00.000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("1669824000000000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("1669824000000")); + Assert.assertEquals(expectedMillis, DateFormat.stringToMillis("1669824000")); + } + + @Test + public void testUnsupportedStringToMillis() { + Assert.assertThrows(KylinException.class, () -> DateFormat.stringToMillis("12/01")); + Assert.assertThrows(KylinException.class, () -> DateFormat.stringToMillis("-12345")); + } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java index ff812fcabf..48344c876e 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -34,6 +34,7 @@ import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType; import org.apache.kylin.measure.hllc.HLLCMeasureType; import org.apache.kylin.measure.percentile.PercentileMeasureType; import org.apache.kylin.measure.raw.RawMeasureType; +import org.apache.kylin.measure.sumlc.SumLCMeasureType; import org.apache.kylin.measure.topn.TopNMeasureType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; @@ -120,6 +121,7 @@ abstract public class MeasureTypeFactory<T> { factoryInsts.add(new IntersectMeasureType.Factory()); factoryInsts.add(new CollectSetMeasureType.Factory()); factoryInsts.add(new CorrMeasureType.Factory()); + factoryInsts.add(new SumLCMeasureType.Factory()); logger.info("Checking custom measure types from kylin config"); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCCounter.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCCounter.java new file mode 100644 index 0000000000..dfca82230b --- /dev/null +++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCCounter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.sumlc; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.Map; +import java.util.function.BiFunction; + +import com.google.common.collect.Maps; + +public class SumLCCounter implements Serializable { + private static final Map<String, BiFunction<Number, Number, Number>> MERGE_FUNC_MAP = Maps.newHashMap(); + + static { + MERGE_FUNC_MAP.put(Long.class.getSimpleName(), (s1, s2) -> Long.sum((Long) s1, (Long) s2)); + MERGE_FUNC_MAP.put(Double.class.getSimpleName(), (s1, s2) -> Double.sum((Double) s1, (Double) s2)); + MERGE_FUNC_MAP.put(BigDecimal.class.getSimpleName(), (s1, s2) -> ((BigDecimal) s1).add((BigDecimal) s2)); + } + + Number sumLC; + Long timestamp; + + public SumLCCounter() { + + } + + public SumLCCounter(Number sumLC, Long timestamp) { + this.sumLC = numericTypeConversion(sumLC); + this.timestamp = timestamp; + } + + public static SumLCCounter merge(SumLCCounter current, Number sumLC, Long timestamp) { + SumLCCounter merged = new SumLCCounter(sumLC, timestamp); + return merge(current, merged); + } + + public static SumLCCounter merge(SumLCCounter value1, SumLCCounter value2) { + if (value1 == null || value1.timestamp == null) + return value2; + if (value2 == null || value2.timestamp == null) + return value1; + if (value2.timestamp > value1.timestamp) { + return value2; + } else if (value1.timestamp > value2.timestamp) { + return value1; + } else { + return mergeSum(value1, value2); + } + } + + private static SumLCCounter mergeSum(SumLCCounter cnt1, SumLCCounter cnt2) { + if (cnt1.sumLC == null) + return cnt2; + if (cnt2.sumLC == null) + return cnt1; + String sumLCTypeName = cnt1.sumLC.getClass().getSimpleName(); + Number semiSum = MERGE_FUNC_MAP.get(sumLCTypeName).apply(cnt1.sumLC, cnt2.sumLC); + return new SumLCCounter(semiSum, cnt1.timestamp); + } + + private static Number numericTypeConversion(Number input) { + if (input instanceof Byte || input instanceof Short || input instanceof Integer) { + return input.longValue(); + } else if (input instanceof Float) { + return input.doubleValue(); + } else { + return input; + } + } + + public Number getSumLC() { + return sumLC; + } + + public Long getTimestamp() { + return timestamp; + } + +} diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCMeasureType.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCMeasureType.java new file mode 100644 index 0000000000..c94c29e54d --- /dev/null +++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/sumlc/SumLCMeasureType.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.sumlc; + +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; + +import com.google.common.collect.ImmutableMap; + +public class SumLCMeasureType extends MeasureType<SumLCCounter> { + public static final String FUNC_SUM_LC = "SUM_LC"; + public static final String DATATYPE_SUM_LC = "sum_lc"; + static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.of(SumLCMeasureType.FUNC_SUM_LC, SumLCAggFunc.class); + + public SumLCMeasureType(String funcName, DataType dataType) { + + } + + @Override + public MeasureIngester<SumLCCounter> newIngester() { + return new MeasureIngester<SumLCCounter>() { + @Override + public SumLCCounter valueOf(String[] values, MeasureDesc measureDesc, + Map<TblColRef, Dictionary<String>> dictionaryMap) { + return null; + } + }; + } + + @Override + public MeasureAggregator<SumLCCounter> newAggregator() { + return new MeasureAggregator<SumLCCounter>() { + @Override + public void reset() { + // left over issue, default implementation ignored + } + + @Override + public void aggregate(SumLCCounter value) { + // left over issue, default implementation ignored + } + + @Override + public SumLCCounter aggregate(SumLCCounter value1, SumLCCounter value2) { + return null; + } + + @Override + public SumLCCounter getState() { + return null; + } + + @Override + public int getMemBytesEstimate() { + return 0; + } + }; + } + + @Override + public boolean needRewrite() { + return true; + } + + @Override + public Map<String, Class<?>> getRewriteCalciteAggrFunctions() { + return UDAF_MAP; + } + + public static class Factory extends MeasureTypeFactory<SumLCCounter> { + + @Override + public MeasureType<SumLCCounter> createMeasureType(String funcName, DataType dataType) { + return new SumLCMeasureType(funcName, dataType); + } + + @Override + public String getAggrFunctionName() { + return FUNC_SUM_LC; + } + + @Override + public String getAggrDataTypeName() { + return DATATYPE_SUM_LC; + } + + @Override + public Class<? extends DataTypeSerializer<SumLCCounter>> getAggrDataTypeSerializer() { + return SumLCSerializer.class; + } + } + + /** + * This class is used for registering sum_lc to calcite schema, no need to implement the functions + */ + public static class SumLCAggFunc { + + public static SumLCCounter init() { + return null; + } + + public static SumLCCounter add(SumLCCounter cur, Object v, Object r) { + return null; + } + + public static SumLCCounter merge(SumLCCounter counter0, SumLCCounter counter1) { + return null; + } + + public static Object result(SumLCCounter counter) { + return null; + } + } + + public static class SumLCSerializer extends DataTypeSerializer<SumLCCounter> { + + public SumLCSerializer(DataType dataType) { + + } + + @Override + public void serialize(SumLCCounter value, ByteBuffer out) { + // left over issue, default implementation ignored + } + + @Override + public SumLCCounter deserialize(ByteBuffer in) { + return null; + } + + @Override + public int peekLength(ByteBuffer in) { + return 0; + } + + @Override + public int maxLength() { + return 0; + } + + @Override + public int getStorageBytesEstimate() { + return 0; + } + } +} diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java index f9ef1bc103..ce7081c1bc 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java @@ -171,9 +171,9 @@ public class DataType implements Serializable { } public static DataType getType(String type) { - if (type == null) + if (type == null) { return null; - + } DataType dataType = new DataType(type); DataType cached = CACHE.get(dataType); if (cached == null) { @@ -409,10 +409,13 @@ public class DataType implements Serializable { public static final BytesSerializer<DataType> serializer = new BytesSerializer<DataType>() { @Override public void serialize(DataType value, ByteBuffer out) { + serializeDataType(value, out); + } + + private void serializeDataType(DataType value, ByteBuffer out) { BytesUtil.writeUTFString(value.name, out); BytesUtil.writeVInt(value.precision, out); BytesUtil.writeVInt(value.scale, out); - } @Override @@ -420,7 +423,6 @@ public class DataType implements Serializable { String name = BytesUtil.readUTFString(in); int precision = BytesUtil.readVInt(in); int scale = BytesUtil.readVInt(in); - return new DataType(name, precision, scale); } }; diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java index c66ed0ce8e..b2a212f84f 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java @@ -43,6 +43,7 @@ import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; +import org.apache.directory.api.util.Strings; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.MeasureTypeFactory; @@ -55,6 +56,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -91,21 +93,31 @@ public class FunctionDesc implements Serializable { public static String proposeReturnType(String expression, String colDataType, Map<String, String> override, boolean saveCheck) { - String returnType = override.getOrDefault(expression, - EXPRESSION_DEFAULT_TYPE_MAP.getOrDefault(expression, colDataType)); - if (saveCheck && colDataType != null && DataType.getType(colDataType).isStringFamily()) { + if (saveCheck) { switch (expression) { case FunctionDesc.FUNC_SUM: - case FunctionDesc.FUNC_PERCENTILE: - throw new KylinException(INVALID_MEASURE_DATA_TYPE, - String.format(Locale.ROOT, "Invalid column type %s for measure %s", colDataType, expression)); + case FunctionDesc.FUNC_PERCENTILE: { + if (colDataType != null && DataType.getType(colDataType).isStringFamily()) { + throw new KylinException(INVALID_MEASURE_DATA_TYPE, String.format(Locale.ROOT, + "Invalid column type %s for measure %s", colDataType, expression)); + } + break; + } + case FunctionDesc.FUNC_SUM_LC: { + Preconditions.checkArgument(Strings.isNotEmpty(colDataType), + "SUM_LC Measure's input type shouldn't be null or empty"); + checkSumLCDataType(colDataType); + break; + } default: break; } } - switch (expression) { - case FunctionDesc.FUNC_SUM: + String returnType = override.getOrDefault(expression, + EXPRESSION_DEFAULT_TYPE_MAP.getOrDefault(expression, colDataType)); + // widen return type for sum or sum_lc measure + if (FunctionDesc.FUNC_SUM.equals(expression) || FunctionDesc.FUNC_SUM_LC.equals(expression)) { if (colDataType != null) { DataType type = DataType.getType(returnType); if (type.isIntegerFamily()) { @@ -119,13 +131,19 @@ public class FunctionDesc implements Serializable { } else { returnType = "decimal(19,4)"; } - break; - default: - break; } return returnType; } + private static void checkSumLCDataType(String dataTypeName) { + DataType dataType = DataType.getType(dataTypeName); + if (!dataType.isNumberFamily()) { + throw new KylinException(INVALID_MEASURE_DATA_TYPE, + String.format(Locale.ROOT, "SUM_LC Measure's return type '%s' is illegal. It must be one of %s", + dataType, DataType.NUMBER_FAMILY)); + } + } + public static final String FUNC_SUM = "SUM"; public static final String FUNC_MIN = "MIN"; public static final String FUNC_MAX = "MAX"; @@ -147,6 +165,7 @@ public class FunctionDesc implements Serializable { public static final String FUNC_PERCENTILE = "PERCENTILE_APPROX"; public static final String FUNC_GROUPING = "GROUPING"; public static final String FUNC_TOP_N = "TOP_N"; + public static final String FUNC_SUM_LC = "SUM_LC"; public static final ImmutableSet<String> DIMENSION_AS_MEASURES = ImmutableSet.<String> builder() .add(FUNC_MAX, FUNC_MIN, FUNC_COUNT_DISTINCT).build(); public static final ImmutableSet<String> NOT_SUPPORTED_FUNCTION = ImmutableSet.<String> builder().build(); @@ -195,19 +214,30 @@ public class FunctionDesc implements Serializable { for (ParameterDesc p : getParameters()) { if (p.isColumnType()) { TblColRef colRef = model.findColumn(p.getValue()); - returnDataType = DataType.getType( - proposeReturnType(expression, colRef.getDatatype(), Maps.newHashMap(), model.isSaveCheck())); p.setValue(colRef.getIdentity()); p.setColRef(colRef); + if (expression.equals(FUNC_SUM_LC)) { + if (Objects.isNull(returnDataType)) { + // use the first column to init returnType and returnDataType, ignore the second timestamp column + returnType = proposeReturnType(expression, colRef.getDatatype(), Maps.newHashMap(), + model.isSaveCheck()); + returnDataType = DataType.getType(returnType); + } + } else { + returnDataType = DataType.getType(proposeReturnType(expression, colRef.getDatatype(), + Maps.newHashMap(), model.isSaveCheck())); + } } } - if (returnDataType == null) { - returnDataType = DataType.getType(BIGINT); - } - if (!StringUtils.isEmpty(returnType)) { - returnDataType = DataType.getType(returnType); + if (!expression.equals(FUNC_SUM_LC)) { + if (returnDataType == null) { + returnDataType = DataType.getType(BIGINT); + } + if (!StringUtils.isEmpty(returnType)) { + returnDataType = DataType.getType(returnType); + } + returnType = returnDataType.toString(); } - returnType = returnDataType.toString(); } private void reInitMeasureType() { diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java index 226b8ac3b7..77ed4f6ecd 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/util/FunctionDescTest.java @@ -26,14 +26,14 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.metadata.model.NDataModel; -import org.apache.kylin.metadata.model.NDataModelManager; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -86,6 +86,12 @@ public class FunctionDescTest extends NLocalFileMetadataTestCase { Assert.fail(); } catch (KylinException ignored) { } + Assert.assertThrows(IllegalArgumentException.class, + () -> FunctionDesc.proposeReturnType("SUM_LC", "", Maps.newHashMap(), true)); + Assert.assertThrows(KylinException.class, + () -> FunctionDesc.proposeReturnType("SUM_LC", "char", Maps.newHashMap(), true)); + String returnType = FunctionDesc.proposeReturnType("SUM_LC", "bigint", Maps.newHashMap(), true); + Assert.assertEquals("bigint", returnType); } @Test diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java index d87805c140..0d2e6cbaf3 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/project/NProjectManagerTest.java @@ -26,10 +26,9 @@ import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; -import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.hystrix.NCircuitBreaker; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; -import org.apache.kylin.metadata.project.ProjectInstance; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -69,7 +68,7 @@ public class NProjectManagerTest extends NLocalFileMetadataTestCase { } val projects = projectManager.listAllProjects(); - Assert.assertEquals(27, projects.size()); + Assert.assertEquals(28, projects.size()); Assert.assertTrue(projects.stream().noneMatch(p -> p.getName().equals("test"))); } diff --git a/src/examples/test_case_data/localmeta/metadata/_global/project/sum_lc.json b/src/examples/test_case_data/localmeta/metadata/_global/project/sum_lc.json new file mode 100644 index 0000000000..2fdd47af4f --- /dev/null +++ b/src/examples/test_case_data/localmeta/metadata/_global/project/sum_lc.json @@ -0,0 +1,35 @@ +{ + "uuid" : "d1ceb3a4-4d2c-27af-5b7d-92c8a4055776", + "last_modified" : 1667464991353, + "create_time" : 1667464991347, + "version" : "4.0.0.0", + "name" : "sum_lc", + "owner" : "ADMIN", + "status" : "ENABLED", + "create_time_utc" : 1667464991347, + "default_database" : "DEFAULT", + "description" : "", + "principal" : null, + "keytab" : null, + "maintain_model_type" : "MANUAL_MAINTAIN", + "override_kylin_properties" : { + "kylin.metadata.semi-automatic-mode" : "false", + "kylin.query.metadata.expose-computed-column" : "true", + "kylin.source.default" : "9" + }, + "segment_config" : { + "auto_merge_enabled" : false, + "auto_merge_time_ranges" : [ "WEEK", "MONTH", "QUARTER", "YEAR" ], + "volatile_range" : { + "volatile_range_number" : 0, + "volatile_range_enabled" : false, + "volatile_range_type" : "DAY" + }, + "retention_range" : { + "retention_range_number" : 1, + "retention_range_enabled" : false, + "retention_range_type" : "MONTH" + }, + "create_empty_segment_enabled" : false + } +} \ No newline at end of file diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/dataflow/f35f2937-9e4d-347a-7465-d64df939e7d6.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/dataflow/f35f2937-9e4d-347a-7465-d64df939e7d6.json new file mode 100644 index 0000000000..f5f39f4889 --- /dev/null +++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/dataflow/f35f2937-9e4d-347a-7465-d64df939e7d6.json @@ -0,0 +1,13 @@ +{ + "uuid" : "f35f2937-9e4d-347a-7465-d64df939e7d6", + "last_modified" : 1667467391143, + "create_time" : 1667465241650, + "version" : "4.0.0.0", + "status" : "ONLINE", + "last_status" : null, + "cost" : 50, + "query_hit_count" : 0, + "last_query_time" : 0, + "layout_query_hit_count" : { }, + "segments" : [ ] +} \ No newline at end of file diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/index_plan/f35f2937-9e4d-347a-7465-d64df939e7d6.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/index_plan/f35f2937-9e4d-347a-7465-d64df939e7d6.json new file mode 100644 index 0000000000..9b29b1fd06 --- /dev/null +++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/index_plan/f35f2937-9e4d-347a-7465-d64df939e7d6.json @@ -0,0 +1,43 @@ +{ + "uuid" : "f35f2937-9e4d-347a-7465-d64df939e7d6", + "last_modified" : 1667467391117, + "create_time" : 1667293061109, + "version" : "4.0.0.0", + "description" : null, + "rule_based_index" : { + "dimensions" : [ 1, 6 ], + "measures" : [ 100000, 100001, 100002, 100003, 100004, 100005, 100006, 100007, 100008 ], + "global_dim_cap" : null, + "aggregation_groups" : [ { + "includes" : [ 1, 6 ], + "measures" : [ 100000, 100001, 100002, 100003, 100004, 100005, 100006, 100007, 100008 ], + "select_rule" : { + "hierarchy_dims" : [ ], + "mandatory_dims" : [ ], + "joint_dims" : [ ] + }, + "index_range" : "EMPTY" + } ], + "layout_id_mapping" : [ 30001, 40001, 50001 ], + "parent_forward" : 3, + "index_start_id" : 30000, + "last_modify_time" : 1667370467484, + "layout_black_list" : [ ], + "scheduler_version" : 2, + "index_update_enabled" : true, + "base_layout_enabled" : true + }, + "indexes" : [ ], + "override_properties" : { }, + "to_be_deleted_indexes" : [ ], + "auto_merge_time_ranges" : null, + "retention_range" : 0, + "engine_type" : 80, + "next_aggregation_index_id" : 60000, + "next_table_index_id" : 20000000000, + "agg_shard_by_columns" : [ ], + "extend_partition_columns" : [ ], + "layout_bucket_num" : { }, + "approved_additional_recs" : 0, + "approved_removal_recs" : 0 +} \ No newline at end of file diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/model_desc/f35f2937-9e4d-347a-7465-d64df939e7d6.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/model_desc/f35f2937-9e4d-347a-7465-d64df939e7d6.json new file mode 100644 index 0000000000..ddc2b999de --- /dev/null +++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/model_desc/f35f2937-9e4d-347a-7465-d64df939e7d6.json @@ -0,0 +1,267 @@ +{ + "uuid" : "f35f2937-9e4d-347a-7465-d64df939e7d6", + "last_modified" : 1667465241644, + "create_time" : 1667293060578, + "version" : "4.0.0.0", + "alias" : "sum_lc_multi_data_type_test", + "owner" : "ADMIN", + "config_last_modifier" : null, + "config_last_modified" : 0, + "description" : null, + "fact_table" : "SSB.SUMLC_EXTEND_4X", + "fact_table_alias" : null, + "management_type" : "MODEL_BASED", + "join_tables" : [ ], + "filter_condition" : "", + "partition_desc" : { + "partition_date_column" : "SUMLC_EXTEND_4X.TX_DATE", + "partition_date_start" : 0, + "partition_date_format" : "yyyy-MM-dd", + "partition_type" : "APPEND", + "partition_condition_builder" : "org.apache.kylin.metadata.model.PartitionDesc$DefaultPartitionConditionBuilder" + }, + "capacity" : "MEDIUM", + "segment_config" : { + "auto_merge_enabled" : null, + "auto_merge_time_ranges" : null, + "volatile_range" : null, + "retention_range" : null, + "create_empty_segment_enabled" : false + }, + "data_check_desc" : null, + "semantic_version" : 0, + "storage_type" : 0, + "model_type" : "BATCH", + "all_named_columns" : [ { + "id" : 0, + "name" : "INT_DATA", + "column" : "SUMLC_EXTEND_4X.INT_DATA" + }, { + "id" : 1, + "name" : "ACCOUNT", + "column" : "SUMLC_EXTEND_4X.ACCOUNT", + "status" : "DIMENSION" + }, { + "id" : 2, + "name" : "TINYINT_DATA", + "column" : "SUMLC_EXTEND_4X.TINYINT_DATA" + }, { + "id" : 3, + "name" : "DOUBLE_DATA", + "column" : "SUMLC_EXTEND_4X.DOUBLE_DATA" + }, { + "id" : 4, + "name" : "SMALLINT_DATA", + "column" : "SUMLC_EXTEND_4X.SMALLINT_DATA" + }, { + "id" : 5, + "name" : "DECIMAL_DATA", + "column" : "SUMLC_EXTEND_4X.DECIMAL_DATA" + }, { + "id" : 6, + "name" : "TX_DATE", + "column" : "SUMLC_EXTEND_4X.TX_DATE", + "status" : "DIMENSION" + }, { + "id" : 7, + "name" : "MILLIS", + "column" : "SUMLC_EXTEND_4X.MILLIS" + }, { + "id" : 8, + "name" : "FLOAT_DATA", + "column" : "SUMLC_EXTEND_4X.FLOAT_DATA" + }, { + "id" : 9, + "name" : "BIGINT_DATA", + "column" : "SUMLC_EXTEND_4X.BIGINT_DATA" + }, { + "id" : 10, + "name" : "TINYINT_CC", + "column" : "SUMLC_EXTEND_4X.TINYINT_CC" + } ], + "all_measures" : [ { + "name" : "COUNT_ALL", + "function" : { + "expression" : "COUNT", + "parameters" : [ { + "type" : "constant", + "value" : "1" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : null, + "id" : 100000, + "type" : "NORMAL", + "internal_ids" : [ ] + }, { + "name" : "sumlc_decimal_tx_date", + "function" : { + "expression" : "SUM_LC", + "parameters" : [ { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.DECIMAL_DATA" + }, { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TX_DATE" + } ], + "returntype" : "decimal(19,7)" + }, + "column" : null, + "comment" : "", + "id" : 100001, + "type" : "NORMAL", + "internal_ids" : [ ] + }, { + "name" : "sumlc_tinyint_tx_date", + "function" : { + "expression" : "SUM_LC", + "parameters" : [ { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TINYINT_DATA" + }, { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TX_DATE" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : "", + "id" : 100002, + "type" : "NORMAL", + "internal_ids" : [ ] + }, { + "name" : "sumlc_smallint_tx_date", + "function" : { + "expression" : "SUM_LC", + "parameters" : [ { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.SMALLINT_DATA" + }, { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TX_DATE" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : "", + "id" : 100003, + "type" : "NORMAL", + "internal_ids" : [ ] + }, { + "name" : "sumlc_int_tx_date", + "function" : { + "expression" : "SUM_LC", + "parameters" : [ { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.INT_DATA" + }, { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TX_DATE" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : "", + "id" : 100004, + "type" : "NORMAL", + "internal_ids" : [ ] + }, { + "name" : "sumlc_bigint_tx_date", + "function" : { + "expression" : "SUM_LC", + "parameters" : [ { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.BIGINT_DATA" + }, { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TX_DATE" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : "", + "id" : 100005, + "type" : "NORMAL", + "internal_ids" : [ ] + }, { + "name" : "sumlc_float_tx_date", + "function" : { + "expression" : "SUM_LC", + "parameters" : [ { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.FLOAT_DATA" + }, { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TX_DATE" + } ], + "returntype" : "double" + }, + "column" : null, + "comment" : "", + "id" : 100006, + "type" : "NORMAL", + "internal_ids" : [ ] + }, { + "name" : "sumlc_double_tx_date", + "function" : { + "expression" : "SUM_LC", + "parameters" : [ { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.DOUBLE_DATA" + }, { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TX_DATE" + } ], + "returntype" : "double" + }, + "column" : null, + "comment" : "", + "id" : 100007, + "type" : "NORMAL", + "internal_ids" : [ ] + }, { + "name" : "sumlc_tinyint_cc_tx_date", + "function" : { + "expression" : "SUM_LC", + "parameters" : [ { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TINYINT_CC" + }, { + "type" : "column", + "value" : "SUMLC_EXTEND_4X.TX_DATE" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : "", + "id" : 100008, + "type" : "NORMAL", + "internal_ids" : [ ] + } ], + "recommendations_count" : 0, + "computed_columns" : [ { + "tableIdentity" : "SSB.SUMLC_EXTEND_4X", + "tableAlias" : "SUMLC_EXTEND_4X", + "columnName" : "TINYINT_CC", + "expression" : "SUMLC_EXTEND_4X.TINYINT_DATA * 2", + "innerExpression" : "`SUMLC_EXTEND_4X`.`TINYINT_DATA` * 2", + "datatype" : "INTEGER", + "comment" : null, + "rec_uuid" : null + } ], + "canvas" : { + "coordinate" : { + "SUMLC_EXTEND_4X" : { + "x" : 307.9999966091586, + "y" : 27.444441053602443, + "width" : 200.0, + "height" : 522.2222222222221 + } + }, + "zoom" : 9.0 + }, + "multi_partition_desc" : null, + "multi_partition_key_mapping" : null, + "fusion_id" : null +} \ No newline at end of file diff --git a/src/examples/test_case_data/localmeta/metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json b/src/examples/test_case_data/localmeta/metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json new file mode 100644 index 0000000000..85880c9a43 --- /dev/null +++ b/src/examples/test_case_data/localmeta/metadata/sum_lc/table/SSB.SUMLC_EXTEND_4X.json @@ -0,0 +1,78 @@ +{ + "uuid" : "5c5334e5-44ec-d8a3-0357-a63d84b56b06", + "last_modified" : 0, + "create_time" : 1667465028069, + "version" : "4.0.0.0", + "name" : "SUMLC_EXTEND_4X", + "columns" : [ { + "id" : "1", + "name" : "TX_DATE", + "datatype" : "date", + "case_sensitive_name" : "tx_date" + }, { + "id" : "2", + "name" : "MILLIS", + "datatype" : "bigint", + "case_sensitive_name" : "millis" + }, { + "id" : "3", + "name" : "ACCOUNT", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "account" + }, { + "id" : "4", + "name" : "TINYINT_DATA", + "datatype" : "tinyint", + "case_sensitive_name" : "tinyint_data" + }, { + "id" : "5", + "name" : "SMALLINT_DATA", + "datatype" : "smallint", + "case_sensitive_name" : "smallint_data" + }, { + "id" : "6", + "name" : "INT_DATA", + "datatype" : "integer", + "case_sensitive_name" : "int_data" + }, { + "id" : "7", + "name" : "BIGINT_DATA", + "datatype" : "bigint", + "case_sensitive_name" : "bigint_data" + }, { + "id" : "8", + "name" : "FLOAT_DATA", + "datatype" : "double", + "case_sensitive_name" : "float_data" + }, { + "id" : "9", + "name" : "DOUBLE_DATA", + "datatype" : "double", + "case_sensitive_name" : "double_data" + }, { + "id" : "10", + "name" : "DECIMAL_DATA", + "datatype" : "decimal(9,7)", + "case_sensitive_name" : "decimal_data" + } ], + "source_type" : 9, + "table_type" : "MANAGED", + "top" : false, + "increment_loading" : false, + "last_snapshot_path" : null, + "last_snapshot_size" : 0, + "snapshot_last_modified" : 0, + "query_hit_count" : 0, + "partition_column" : null, + "snapshot_partitions" : { }, + "snapshot_partitions_info" : { }, + "snapshot_total_rows" : 0, + "snapshot_partition_col" : null, + "selected_snapshot_partition_col" : null, + "temp_snapshot_path" : null, + "snapshot_has_broken" : false, + "database" : "SSB", + "transactional" : false, + "rangePartition" : false, + "partition_desc" : null +} \ No newline at end of file diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java index 61720f727c..34160d76c6 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java @@ -56,6 +56,7 @@ import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore; import org.apache.kylin.query.pushdown.PushDownRunnerSparkImpl; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.GarbageCleanUpConfigRequest; @@ -97,7 +98,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.kyligence.kap.clickhouse.MockSecondStorage; -import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore; import lombok.val; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -235,14 +235,14 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase { public void testGetReadableProjects() { Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class)); List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false); - Assert.assertEquals(27, projectInstances.size()); + Assert.assertEquals(28, projectInstances.size()); } @Test public void testGetAdminProjects() throws Exception { Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class)); List<ProjectInstance> projectInstances = projectService.getAdminProjects(); - Assert.assertEquals(27, projectInstances.size()); + Assert.assertEquals(28, projectInstances.size()); } @Test @@ -256,7 +256,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase { public void testGetReadableProjectsHasNoPermissionProject() { Mockito.doReturn(true).when(aclEvaluate).hasProjectAdminPermission(Mockito.any(ProjectInstance.class)); List<ProjectInstance> projectInstances = projectService.getReadableProjects("", false); - Assert.assertEquals(27, projectInstances.size()); + Assert.assertEquals(28, projectInstances.size()); } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java index bb004b174d..73c50d3d64 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapAggregateRel.java @@ -46,13 +46,13 @@ import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.measure.corr.CorrMeasureType; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.PartitionDesc; -import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MultiPartitionDesc; import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.query.util.ICutContextStrategy; import com.google.common.collect.ImmutableList; @@ -65,7 +65,7 @@ import com.google.common.collect.Sets; public class KapAggregateRel extends OLAPAggregateRel implements KapRel { protected static final List<String> supportedFunction = Lists.newArrayList("SUM", "MIN", "MAX", "COUNT_DISTINCT", - "BITMAP_UUID", "PERCENTILE_APPROX", FunctionDesc.FUNC_BITMAP_BUILD); + "BITMAP_UUID", "PERCENTILE_APPROX", FunctionDesc.FUNC_BITMAP_BUILD, FunctionDesc.FUNC_SUM_LC); private ImmutableList<Integer> rewriteGroupKeys; // preserve the ordering of group keys after CC replacement private List<ImmutableBitSet> rewriteGroupSets; // group sets with cc replaced List<AggregateCall> aggregateCalls; @@ -203,8 +203,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { TblColRef originalColumn = inputColumnRowType.getColumnByIndex(i); if (null != this.context && this.context.getGroupCCColRewriteMapping().containsKey(originalColumn)) { groups.add(this.context.getGroupCCColRewriteMapping().get(originalColumn)); - groupKeys - .add(inputColumnRowType.getIndexByName(this.context.getGroupCCColRewriteMapping().get(originalColumn).getName())); + groupKeys.add(inputColumnRowType + .getIndexByName(this.context.getGroupCCColRewriteMapping().get(originalColumn).getName())); } else { Set<TblColRef> sourceColumns = inputColumnRowType.getSourceColumnsByIndex(i); groups.addAll(sourceColumns); @@ -307,7 +307,6 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { // rebuild rowType & columnRowType this.rowType = this.deriveRowType(); this.columnRowType = this.buildColumnRowType(); - } private Boolean isExactlyMatched() { @@ -330,7 +329,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { return false; } - if (!checkAggCall()) return false; + if (!checkAggCall()) + return false; Set<String> cuboidDimSet = new HashSet<>(); if (getContext() != null && getContext().storageContext.getCandidate() != null) { cuboidDimSet = getContext().storageContext.getCandidate().getLayoutEntity().getOrderedDimensions().values() @@ -403,7 +403,7 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { private boolean isDimExactlyMatch(Set<String> groupByCols, Set<String> cuboidDimSet) { return groupByCols.equals(cuboidDimSet) && isSimpleGroupType() && (this.context.getInnerGroupByColumns().isEmpty() - || !this.context.getGroupCCColRewriteMapping().isEmpty()); + || !this.context.getGroupCCColRewriteMapping().isEmpty()); } @@ -502,7 +502,8 @@ public class KapAggregateRel extends OLAPAggregateRel implements KapRel { } public boolean isContainCountDistinct() { - return aggregateCalls.stream().anyMatch(agg -> agg.getAggregation().getKind() == SqlKind.COUNT && agg.isDistinct()); + return aggregateCalls.stream() + .anyMatch(agg -> agg.getAggregation().getKind() == SqlKind.COUNT && agg.isDistinct()); } } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index e944266a82..d68329c0a0 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -101,6 +101,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { protected List<AggregateCall> rewriteAggCalls; protected List<TblColRef> groups; protected List<FunctionDesc> aggregations; + public OLAPAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) throws InvalidRelException { @@ -501,7 +502,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { // rebuild aggregate call return new AggregateCall(newAgg, false, newArgList, fieldType, callName); - } /** diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java index a53f9af8e7..c1f23bb5d7 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java @@ -365,7 +365,7 @@ public class QueryHistoryServiceTest extends NLocalFileMetadataTestCase { // get all tables tableMap = queryHistoryService.getQueryHistoryTableMap(null); - Assert.assertEquals(27, tableMap.size()); + Assert.assertEquals(28, tableMap.size()); // not existing project try { diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java index fe1d092ee3..81439609e9 100644 --- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java +++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java @@ -50,6 +50,7 @@ import org.apache.kylin.metadata.query.StructField; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException; import org.apache.kylin.query.engine.data.QueryResult; +import org.apache.kylin.query.exception.NotSupportedSQLException; import org.apache.kylin.query.mask.QueryResultMasks; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.util.PushDownUtil; @@ -81,6 +82,7 @@ public class QueryRoutingEngine { queryParams.setDefaultSchema(queryExec.getDefaultSchemaName()); if (queryParams.isForcedToPushDown()) { + checkContainsSumLC(queryParams, null); return pushDownQuery(null, queryParams); } @@ -128,6 +130,7 @@ public class QueryRoutingEngine { if (cause instanceof SQLException && cause.getCause() instanceof KylinException) { throw (SQLException) cause; } + checkContainsSumLC(queryParams, e); if (shouldPushdown(cause, queryParams)) { return pushDownQuery((SQLException) cause, queryParams); } else { @@ -146,6 +149,7 @@ public class QueryRoutingEngine { } } } + checkContainsSumLC(queryParams, e); if (shouldPushdown(e, queryParams)) { return pushDownQuery(e, queryParams); } else { @@ -167,6 +171,17 @@ public class QueryRoutingEngine { return false; } + private void checkContainsSumLC(QueryParams queryParams, Throwable t) { + if (queryParams.getSql().contains("sum_lc")) { + String message = "There is no aggregate index to answer this query, sum_lc() function now is not supported by other query engine"; + if (t != null) { + throw new NotSupportedSQLException(message, t); + } else { + throw new NotSupportedSQLException(message); + } + } + } + private boolean shouldPushdown(Throwable e, QueryParams queryParams) { if (queryParams.isForcedToIndex()) { return false; diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java index 9a1725f1bc..93fe27494f 100644 --- a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java @@ -17,11 +17,11 @@ */ package org.apache.kylin.query.engine; -import static org.apache.kylin.query.util.AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE; import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_DIST_META_URL; import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_JOB_ID; import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_QUERY_CONTEXT; import static org.apache.kylin.metadata.cube.model.NBatchConstants.P_QUERY_PARAMS; +import static org.apache.kylin.query.util.AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE; import java.io.BufferedReader; import java.io.IOException; @@ -45,11 +45,11 @@ import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.ShellException; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.query.util.QueryParams; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -217,7 +217,7 @@ public class AsyncQueryJobTest extends NLocalFileMetadataTestCase { rawResourceMap.put(zipEntry.getName(), raw); } } - Assert.assertEquals(83, rawResourceMap.size()); + Assert.assertEquals(84, rawResourceMap.size()); } private void testKylinConfig(FileSystem workingFileSystem, FileStatus metaFileStatus) throws IOException { diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java index f929d50a39..7599095bf6 100644 --- a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java @@ -39,6 +39,7 @@ import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException; import org.apache.kylin.query.QueryExtension; import org.apache.kylin.query.engine.data.QueryResult; +import org.apache.kylin.query.exception.NotSupportedSQLException; import org.apache.kylin.query.util.QueryParams; import org.apache.kylin.source.adhocquery.PushdownResult; import org.apache.spark.SparkException; @@ -304,4 +305,19 @@ public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase { QueryResult queryResult = queryRoutingEngine.queryWithSqlMassage(queryParams); Assert.assertEquals(0, queryResult.getSize()); } + + @Test + public void testQueryPushDownWithSumLC() { + final String sql = "select sum_lc(column, dateColumn) from success_table_2"; + final String project = "default"; + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + QueryParams queryParams = new QueryParams(); + queryParams.setProject(project); + queryParams.setSql(sql); + queryParams.setKylinConfig(kylinconfig); + queryParams.setSelect(true); + queryParams.setForcedToPushDown(true); + + Assert.assertThrows(NotSupportedSQLException.class, () -> queryRoutingEngine.queryWithSqlMassage(queryParams)); + } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala index e1f538bad8..f81e6f7d37 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala @@ -20,19 +20,19 @@ package org.apache.kylin.engine.spark.job import org.apache.kylin.common.KapConfig import org.apache.kylin.engine.spark.builder.DFBuilderHelper.ENCODE_SUFFIX +import org.apache.kylin.measure.bitmap.BitmapMeasureType +import org.apache.kylin.measure.hllc.HLLCMeasureType import org.apache.kylin.metadata.cube.cuboid.NSpanningTree import org.apache.kylin.metadata.cube.model.{NCubeJoinedFlatTableDesc, NDataSegment} import org.apache.kylin.metadata.model.NDataModel.Measure -import org.apache.kylin.measure.bitmap.BitmapMeasureType -import org.apache.kylin.measure.hllc.HLLCMeasureType import org.apache.kylin.metadata.model.TblColRef -import org.apache.spark.sql.functions.{col, _} +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.udaf._ import org.apache.spark.sql.util.SparderTypeUtil import org.apache.spark.sql.util.SparderTypeUtil.toSparkType -import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.{Column, DataFrame} import java.util import java.util.Locale @@ -60,20 +60,20 @@ object CuboidAggregator { dataset.schema.fieldNames.zipWithIndex.map(tp => (tp._2, tp._1)).toMap aggregate(dataset, dimensions, measures, // - (colRef: TblColRef) => columnIndex.apply(flatTableDesc.getColumnIndex(colRef)) ) + (colRef: TblColRef) => columnIndex.apply(flatTableDesc.getColumnIndex(colRef))) } /** - * Avoid compilation error when invoking aggregate in java - * incompatible types: Function1 is not a functional interface - * - * @param dataset - * @param dimensions - * @param measures - * @param tableDesc - * @param isSparkSQL - * @return - */ + * Avoid compilation error when invoking aggregate in java + * incompatible types: Function1 is not a functional interface + * + * @param dataset + * @param dimensions + * @param measures + * @param tableDesc + * @param isSparkSQL + * @return + */ def aggregateJava(dataset: DataFrame, dimensions: util.Set[Integer], measures: util.Map[Integer, Measure], @@ -223,6 +223,15 @@ object CuboidAggregator { } case "CORR" => new Column(Literal(null, DoubleType)).as(measureEntry._1.toString) + case "SUM_LC" => + val colDataType = function.getReturnDataType + val sparkDataType = toSparkType(colDataType) + if (reuseLayout) { + new Column(ReuseSumLC(columns.head.expr, sparkDataType).toAggregateExpression()).as(measureEntry._1.toString) + } else { + new Column(EncodeSumLC(columns.head.expr, columns.drop(1).head.expr, sparkDataType) + .toAggregateExpression()).as(measureEntry._1.toString) + } } }.toSeq diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala index 8a2682d460..714b42d15d 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala @@ -17,11 +17,11 @@ */ package org.apache.kylin.query.runtime.plan -import org.apache.kylin.engine.spark.utils.LogEx import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rex.RexLiteral import org.apache.calcite.sql.SqlKind import org.apache.kylin.common.KylinConfig +import org.apache.kylin.engine.spark.utils.LogEx import org.apache.kylin.metadata.model.FunctionDesc import org.apache.kylin.query.relnode.{KapAggregateRel, KapProjectRel, KylinAggregateCall, OLAPAggregateRel} import org.apache.kylin.query.util.RuntimeHelper @@ -43,7 +43,8 @@ import scala.collection.JavaConverters._ // scalastyle:off object AggregatePlan extends LogEx { val binaryMeasureType = - List("PERCENTILE", "PERCENTILE_APPROX", "INTERSECT_COUNT", "COUNT_DISTINCT", "BITMAP_UUID", FunctionDesc.FUNC_BITMAP_BUILD) + List("PERCENTILE", "PERCENTILE_APPROX", "INTERSECT_COUNT", "COUNT_DISTINCT", "BITMAP_UUID", + FunctionDesc.FUNC_BITMAP_BUILD, FunctionDesc.FUNC_SUM_LC) def agg(inputs: java.util.List[DataFrame], rel: KapAggregateRel): DataFrame = logTime("aggregate", debug = true) { @@ -83,6 +84,10 @@ object AggregatePlan extends LogEx { case FunctionDesc.FUNC_BITMAP_BUILD => val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, "BITMAP_BUILD_DECODE", hash, argNames: _*) KapFunctions.precise_bitmap_build_decode(columnName.head).alias(aggName) + case FunctionDesc.FUNC_SUM_LC => + val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, "SUM_LC_DECODE", hash, argNames: _*) + val sparkDataType = SparderTypeUtil.toSparkType(dataType) + KapFunctions.k_sum_lc_decode(columnName.head, sparkDataType.json).alias(aggName) case _ => col(schemaNames.apply(call.getArgList.get(0))) } @@ -185,6 +190,8 @@ object AggregatePlan extends LogEx { } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_PERCENTILE)) { require(columnName.size == 2, s"Input columns size ${columnName.size} don't equal to 2.") KapFunctions.k_percentile(columnName.head, columnName(1), dataType.getPrecision).alias(aggName) + } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_SUM_LC)) { + KapFunctions.k_sum_lc(columnName.head, SparderTypeUtil.toSparkType(dataType)).alias(aggName) } else { callUDF(registeredFuncName, columnName.toList: _*).alias(aggName) } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala index eae6251f1a..7d6d9e4cb2 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala @@ -21,13 +21,11 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionUtils.expression import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction -import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, CeilDateTime, DictEncode, DictEncodeV3, Expression, ExpressionInfo, FloorDateTime, ImplicitCastInputTypes, In, KapAddMonths, KapSubtractMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RLike, RoundBase, SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate} -import org.apache.spark.sql.types.{ArrayType, BinaryType, DoubleType, LongType, StringType} import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, CeilDateTime, DictEncode, EmptyRow, Expression, ExpressionInfo, FloorDateTime, ImplicitCastInputTypes, In, KapAddMonths, KapSubtractMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RLike, RoundBase, SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate} -import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DataType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType} -import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount, Percentile, PreciseBitmapBuildBase64Decode, PreciseBitmapBuildBase64WithIndex, PreciseBitmapBuildPushDown, PreciseCardinality, PreciseCountDistinct, PreciseCountDistinctAndArray, PreciseCountDistinctAndValue, ReusePreciseCountDistinct} +import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, CeilDateTime, DictEncode, DictEncodeV3, EmptyRow, Expression, ExpressionInfo, FloorDateTime, ImplicitCastInputTypes, In, KapAddMonths, KapSubtractMonths, Like, Literal, PercentileDecode, PreciseCountDistinctDecode, RLike, RoundBase, SplitPart, Sum0, SumLCDecode, TimestampAdd, TimestampDiff, Truncate} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.udaf._ object KapFunctions { @@ -62,6 +60,12 @@ object KapFunctions { def in(value: Expression, list: Seq[Expression]): Column = Column(In(value, list)) + def k_sum_lc(measureCol: Column, wrapDataType: DataType): Column = + Column(ReuseSumLC(measureCol.expr, wrapDataType, wrapDataType).toAggregateExpression()) + + def k_sum_lc_decode(measureCol: Column, wrapDataType: String): Column = + Column(SumLCDecode(measureCol.expr, Literal(wrapDataType))) + def k_percentile(head: Column, column: Column, precision: Int): Column = Column(Percentile(head.expr, precision, Some(column.expr), DoubleType).toAggregateExpression()) diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala index ea49dac47e..321bece62c 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala @@ -22,8 +22,8 @@ import org.apache.kylin.measure.hllc.HLLCounter import org.apache.kylin.measure.percentile.PercentileSerializer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.types.Decimal -import org.apache.spark.sql.udaf.BitmapSerAndDeSerObj +import org.apache.spark.sql.types.{DataType, Decimal} +import org.apache.spark.sql.udaf.{BitmapSerAndDeSerObj, SumLCUtil} import java.nio.ByteBuffer import scala.reflect.ClassTag @@ -117,15 +117,15 @@ object ExpressionUtils { } def approxCountDistinctDecodeHelper(bytes: Any, precision: Any): Long = { - val storageFormat = bytes.asInstanceOf[Array[Byte]] - val preciseValue = precision.asInstanceOf[Int] - if (storageFormat.nonEmpty) { - val counter = new HLLCounter(preciseValue) - counter.readRegisters(ByteBuffer.wrap(storageFormat)) - counter.getCountEstimate - } else { - 0L - } + val storageFormat = bytes.asInstanceOf[Array[Byte]] + val preciseValue = precision.asInstanceOf[Int] + if (storageFormat.nonEmpty) { + val counter = new HLLCounter(preciseValue) + counter.readRegisters(ByteBuffer.wrap(storageFormat)) + counter.getCountEstimate + } else { + 0L + } } def percentileDecodeHelper(bytes: Any, quantile: Any, precision: Any): Double = { @@ -134,5 +134,12 @@ object ExpressionUtils { val counter = serializer.deserialize(ByteBuffer.wrap(arrayBytes)) counter.getResultEstimateWithQuantileRatio(quantile.asInstanceOf[Decimal].toDouble) } -} + def sumLCDecodeHelper(bytes: Any, wrapDataType: Any): Number = { + val arrayBytes = bytes.asInstanceOf[Array[Byte]] + val codec = SumLCUtil.getNumericNullSafeSerializerByDataType(DataType.fromJson(wrapDataType.toString)) + val counter = SumLCUtil.decodeToSumLCCounter(arrayBytes, codec) + counter.getSumLC + } + +} diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala index 24e25c940f..ea28802be5 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala @@ -125,7 +125,6 @@ case class KapSubtractMonths(a: Expression, b: Expression) import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.util.TypeUtils -import org.apache.spark.sql.types._ @ExpressionDescription( usage = "_FUNC_(expr) - Returns the sum calculated from values of a group. " + @@ -772,4 +771,65 @@ case class PercentileDecode(bytes: Expression, quantile: Expression, precision: val newChildren = Seq(newFirst, newSecond, newThird) super.legacyWithNewChildren(newChildren) } -} \ No newline at end of file +} + +case class SumLCDecode(bytes: Expression, wrapDataTypeExpr: Expression) extends BinaryExpression with ExpectsInputTypes { + override def left: Expression = bytes; + + override def right: Expression = wrapDataTypeExpr + + override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType, StringType) + + def wrapDataType = DataType.fromJson(wrapDataTypeExpr.toString) + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val leftGen = left.genCode(ctx) + val rightGen = right.genCode(ctx) + val expressionUtils = ExpressionUtils.getClass.getName.stripSuffix("$") + val decimalUtil = classOf[Decimal].getName + val evalValue = ctx.freshName("evalValue") + val javaType = CodeGenerator.javaType(dataType) + val boxedJavaType = CodeGenerator.boxedType(javaType) + val commonCodeBlock = + code""" + ${leftGen.code} + ${rightGen.code} + Number $evalValue = $expressionUtils.sumLCDecodeHelper(${leftGen.value}, ${rightGen.value}); + boolean ${ev.isNull} = $evalValue == null; + $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + """ + val conditionCodeBlock = if (wrapDataType.isInstanceOf[DecimalType]) { + code""" + if(!${ev.isNull}) { + ${ev.value} = $decimalUtil.fromDecimal($evalValue); + } + """ + } else { + code""" + if(!${ev.isNull}) { + ${ev.value} = ($boxedJavaType) $evalValue; + } + """ + } + ev.copy(code = commonCodeBlock + conditionCodeBlock) + } + + override protected def nullSafeEval(bytes: Any, wrapDataTypeExpr: Any): Any = { + val decodeVal = ExpressionUtils.sumLCDecodeHelper(bytes, wrapDataTypeExpr) + wrapDataType match { + case DecimalType() => + Decimal.fromDecimal(decodeVal.asInstanceOf[java.math.BigDecimal]) + case _ => + decodeVal + } + } + + override def dataType: DataType = wrapDataType + + override def prettyName: String = "sum_lc_decode" + + override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = { + val newChildren = Seq(newLeft, newRight) + super.legacyWithNewChildren(newChildren) + } +} diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala index 422951291d..1397251368 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/SparderAggFun.scala @@ -18,9 +18,6 @@ package org.apache.spark.sql.udf -import java.nio.ByteBuffer -import java.util - import com.google.common.collect.Maps import org.apache.kylin.measure.MeasureAggregator import org.apache.kylin.measure.bitmap.BitmapCounter @@ -35,6 +32,9 @@ import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAg import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SparderTypeUtil +import java.nio.ByteBuffer +import java.util + class SparderAggFun(funcName: String, dataTp: KyDataType) extends UserDefinedAggregateFunction with Logging { @@ -89,7 +89,7 @@ class SparderAggFun(funcName: String, dataTp: KyDataType) byteBuffer = ByteBuffer.allocate(1024 * 1024) } - val initVal = if (isCount) { + val initVal = if (isCount) { // return 0 instead of null in case of no input measureAggregator.reset() byteBuffer.clear() @@ -106,6 +106,7 @@ class SparderAggFun(funcName: String, dataTp: KyDataType) override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { merge(buffer, input) } + override def merge(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { measureAggregator.reset() @@ -167,7 +168,7 @@ class SparderAggFun(funcName: String, dataTp: KyDataType) case _ => null } - ret + ret } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala index 528c8cb520..429cdfbbe9 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/LayoutEntityConverter.scala @@ -49,11 +49,11 @@ object LayoutEntityConverter { bucketSpec = Option(bucketSp)) } - def toSchema() : StructType = { + def toSchema(): StructType = { genCuboidSchemaFromNCuboidLayout(layoutEntity) } - def toExactlySchema() : StructType = { + def toExactlySchema(): StructType = { genCuboidSchemaFromNCuboidLayout(layoutEntity, true) } } @@ -122,6 +122,7 @@ object LayoutEntityConverter { genSparkStructField(i._1.toString, i._2) }.toSeq ++ measures) } + def genBucketSpec(layoutEntity: LayoutEntity, partitionColumn: Set[String]): Option[BucketSpec] = { if (layoutEntity.getShardByColumns.isEmpty) { Option(BucketSpec(layoutEntity.getBucketNum, @@ -161,6 +162,7 @@ object LayoutEntityConverter { case "COLLECT_SET" => val parameter = function.getParameters.get(0) ArrayType(SparderTypeUtil.toSparkType(parameter.getColRef.getType)) + case "SUM_LC" => BinaryType case _ => SparderTypeUtil.toSparkType(function.getReturnDataType) } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala index 75a035a344..92fd5dd0d5 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/NullSafeValueSerializer.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.udaf +import org.apache.spark.unsafe.types.UTF8String + import java.io.{DataInput, DataOutput} import java.nio.charset.StandardCharsets -import org.apache.spark.unsafe.types.UTF8String - @SerialVersionUID(1) sealed trait NullSafeValueSerializer { final def serialize(output: DataOutput, value: Any): Unit = { @@ -150,6 +150,29 @@ class StringSerializer extends NullSafeValueSerializer { class DecimalSerializer extends NullSafeValueSerializer { override def serialize0(output: DataOutput, value: Any): Unit = { val decimal = value.asInstanceOf[BigDecimal] + DecimalCodecUtil.encode(decimal, output) + } + + override def deSerialize0(input: DataInput, length: Int): Any = { + DecimalCodecUtil.decode(input) + } +} + +@SerialVersionUID(1) +class JavaBigDecimalSerializer extends NullSafeValueSerializer { + override def serialize0(output: DataOutput, value: Any): Unit = { + val decimal = BigDecimal.apply(value.asInstanceOf[java.math.BigDecimal]) + DecimalCodecUtil.encode(decimal, output); + } + + override def deSerialize0(input: DataInput, length: Int): Any = { + val decimal = DecimalCodecUtil.decode(input) + decimal.asInstanceOf[BigDecimal].bigDecimal + } +} + +object DecimalCodecUtil { + def encode(decimal: BigDecimal, output: DataOutput): Unit = { val bytes = decimal.toString().getBytes(StandardCharsets.UTF_8) output.writeInt(1 + bytes.length) output.writeByte(decimal.scale) @@ -157,7 +180,7 @@ class DecimalSerializer extends NullSafeValueSerializer { output.write(bytes) } - override def deSerialize0(input: DataInput, length: Int): Any = { + def decode(input: DataInput): Any = { val scale = input.readByte() val length = input.readInt() val bytes = new Array[Byte](length) @@ -166,4 +189,3 @@ class DecimalSerializer extends NullSafeValueSerializer { decimal.setScale(scale) } } - diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/SumLC.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/SumLC.scala new file mode 100644 index 0000000000..bb68841e01 --- /dev/null +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/udaf/SumLC.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.udaf + +import com.esotericsoftware.kryo.KryoException +import com.esotericsoftware.kryo.io.{Input, KryoDataInput, KryoDataOutput, Output} +import org.apache.kylin.common.util.DateFormat +import org.apache.kylin.measure.sumlc.SumLCCounter +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate} +import org.apache.spark.sql.types._ + +/** + * Build sum_lc measure, has two implements, + * for non-reuse sum_lc, two input, one for value column, another for date column + * for reuse sum_lc,construct measure from parent layout + * + * @param wrapDataType return serialize data type + * @param outputDataType return calculate data type, see Percentile outputType + * @param mutableAggBufferOffset default value + * @param inputAggBufferOffset default value + */ +sealed abstract class BaseSumLC(wrapDataType: DataType, + outputDataType: DataType = BinaryType, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) + extends TypedImperativeAggregate[SumLCCounter] with Serializable with Logging { + lazy val serializer: NullSafeValueSerializer = SumLCUtil.getNumericNullSafeSerializerByDataType(wrapDataType) + + override def prettyName: String = "sum_lc" + + override def eval(buffer: SumLCCounter): Any = { + outputDataType match { + case BinaryType => + serialize(buffer) + case DecimalType() => + if (buffer.getSumLC != null) { + Decimal.fromDecimal(buffer.getSumLC.asInstanceOf[java.math.BigDecimal]) + } else { + Decimal.ZERO + } + case _ => + buffer.getSumLC + } + } + + override def createAggregationBuffer(): SumLCCounter = new SumLCCounter() + + override def merge(buffer: SumLCCounter, input: SumLCCounter): SumLCCounter = { + SumLCCounter.merge(buffer, input) + } + + override def serialize(buffer: SumLCCounter): Array[Byte] = { + val array: Array[Byte] = new Array[Byte](1024 * 1024) + val output: Output = new Output(array) + serialize(buffer, array, output) + } + + private def serialize(buffer: SumLCCounter, array: Array[Byte], output: Output): Array[Byte] = { + try { + if (buffer == null) { + Array.empty[Byte] + } else { + output.clear() + val out = new KryoDataOutput(output) + serializer.serialize(out, buffer.getSumLC) + out.writeLong(buffer.getTimestamp) + val mark = output.position() + output.close() + array.slice(0, mark) + } + } catch { + case th: KryoException if th.getMessage.contains("Buffer overflow") => + logWarning(s"Resize buffer size to ${array.length * 2}") + val updateArray = new Array[Byte](array.length * 2) + output.setBuffer(updateArray) + serialize(buffer, updateArray, output) + case th => + throw th + } + } + + override def deserialize(bytes: Array[Byte]): SumLCCounter = { + SumLCUtil.decodeToSumLCCounter(bytes, serializer) + } + + override def nullable: Boolean = false + + override def dataType: DataType = outputDataType +} + +case class EncodeSumLC( + evalCol: Expression, + dateCol: Expression, + wrapDataType: DataType, + outputDataType: DataType = BinaryType, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) + extends BaseSumLC(wrapDataType, outputDataType, mutableAggBufferOffset, inputAggBufferOffset) { + + override def update(buffer: SumLCCounter, input: InternalRow): SumLCCounter = { + val columnEvalVal = evalCol.eval(input) + val columnVal = columnEvalVal match { + case decimal: Decimal => + decimal.toJavaBigDecimal + case _ => + columnEvalVal.asInstanceOf[Number] + } + val dateValStr = String.valueOf(dateCol.eval(input)).trim + val timestampVal = DateFormat.stringToMillis(dateValStr) + SumLCCounter.merge(buffer, columnVal, timestampVal) + } + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = + copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = + copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def children: Seq[Expression] = Seq(evalCol, dateCol) + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = + super.legacyWithNewChildren(newChildren) +} + +case class ReuseSumLC(measure: Expression, + wrapDataType: DataType, + outputDataType: DataType = BinaryType, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) + extends BaseSumLC(wrapDataType, outputDataType, mutableAggBufferOffset, inputAggBufferOffset) { + + override def update(buffer: SumLCCounter, input: InternalRow): SumLCCounter = { + val evalCounter = deserialize(measure.eval(input).asInstanceOf[Array[Byte]]) + SumLCCounter.merge(buffer, evalCounter) + } + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = + copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = + copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def children: Seq[Expression] = Seq(measure) + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = + super.legacyWithNewChildren(newChildren) + +} + +object SumLCUtil extends Logging { + + def decodeToSumLCCounter(bytes: Array[Byte], codec: NullSafeValueSerializer): SumLCCounter = { + if (bytes.nonEmpty) { + val in = new KryoDataInput(new Input(bytes)) + val sumLC = codec.deserialize(in).asInstanceOf[Number] + val timestamp = in.readLong() + new SumLCCounter(sumLC, timestamp) + } else { + new SumLCCounter() + } + } + + def getNumericNullSafeSerializerByDataType(dataType: org.apache.spark.sql.types.DataType): NullSafeValueSerializer = { + dataType match { + case LongType => new LongSerializer + case DoubleType => new DoubleSerializer + case DecimalType() => new JavaBigDecimalSerializer + case dt => throw new UnsupportedOperationException("Unsupported sum_lc dimension type: " + dt) + } + } + +}