This is an automated email from the ASF dual-hosted git repository.
jlfsdtc pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new abdc910760 KYLIN-6078 submit jobs when refresh specify partitions
abdc910760 is described below
commit abdc9107606844e56d1581079a0ec3a62f04e953
Author: jlf <[email protected]>
AuthorDate: Mon Sep 15 10:56:03 2025 +0800
KYLIN-6078 submit jobs when refresh specify partitions
---
.../rest/request/InternalTableBuildRequest.java | 2 +-
.../kylin/job/handler/InternalTableJobHandler.java | 2 +
.../kylin/metadata/cube/model/NBatchConstants.java | 8 +-
.../java/org/apache/kylin/util/DataRangeUtils.java | 8 +-
.../kylin/rest/response/ExecutableResponse.java | 15 ++--
.../rest/service/InternalTableServiceTest.java | 13 ++-
.../apache/kylin/rest/service/JobServiceTest.java | 5 --
.../rest/service/InternalTableLoadingService.java | 93 ++++++++++++----------
.../kylin/rest/service/InternalTableService.java | 3 +
.../engine/spark/job/InternalTableLoadJob.java | 8 +-
.../engine/spark/job/InternalTableLoadingJob.java | 23 ++----
.../spark/job/InternalTableUpdateMetadataStep.java | 51 ++++++++----
.../engine/spark/builder/InternalTableLoader.scala | 5 +-
13 files changed, 141 insertions(+), 95 deletions(-)
diff --git
a/src/common-service/src/main/java/org/apache/kylin/rest/request/InternalTableBuildRequest.java
b/src/common-service/src/main/java/org/apache/kylin/rest/request/InternalTableBuildRequest.java
index 434f1bdbaf..49359e817c 100644
---
a/src/common-service/src/main/java/org/apache/kylin/rest/request/InternalTableBuildRequest.java
+++
b/src/common-service/src/main/java/org/apache/kylin/rest/request/InternalTableBuildRequest.java
@@ -41,6 +41,6 @@ public class InternalTableBuildRequest {
private String yarnQueue;
@JsonProperty("partitions")
- private String[] partitions;
+ private String[] partitions = new String[] {};
}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
b/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
index d127ccb76c..908f4f8c75 100644
---
a/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
+++
b/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
@@ -39,6 +39,7 @@ public class InternalTableJobHandler extends
AbstractJobHandler {
private String endDate;
private String deletePartitionValues;
private String deletePartition; // true or false
+ private String refreshPartitionValues;
public InternalTableJobBuildParam(JobParam jobParam) {
super(null, null, jobParam.getOwner(), jobParam.getJobTypeEnum(),
jobParam.getJobId(), null, null, null,
@@ -51,6 +52,7 @@ public class InternalTableJobHandler extends
AbstractJobHandler {
this.endDate =
jobParam.getExtParams().get(NBatchConstants.P_END_DATE);
this.deletePartitionValues =
jobParam.getExtParams().get(NBatchConstants.P_DELETE_PARTITION_VALUES);
this.deletePartition =
jobParam.getExtParams().get(NBatchConstants.P_DELETE_PARTITION);
+ this.refreshPartitionValues =
jobParam.getExtParams().get(NBatchConstants.P_REFRESH_PARTITION_VALUES);
}
}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
index fab3fab21a..ea34305552 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
@@ -76,7 +76,7 @@ public interface NBatchConstants {
String P_DELETE_PARTITION = "deletePartition";
String P_SORT_BY_PARTITION_BEFORE_SAVE = "sortByPartition";
String P_PRELOADED_CACHE = "preloadedCache";
-
+ String P_REFRESH_PARTITION_VALUES = "refreshPartitionValues";
/** index planner job parameters */
String P_PLANNER_INITIALIZE_CUBOID_COUNT =
"kylin.planner.initializeCuboidCount";
@@ -92,10 +92,8 @@ public interface NBatchConstants {
@Getter
enum TblPropertyKey {
- PRIMARY_KEY(P_PRIMARY_KEY),
- ORDER_BY_KEY(P_ORDER_BY_KEY),
- BUCKET_COLUMN(P_BUCKET_COLUMN),
- BUCKET_NUM(P_BUCKET_NUM);
+ PRIMARY_KEY(P_PRIMARY_KEY), ORDER_BY_KEY(P_ORDER_BY_KEY),
BUCKET_COLUMN(P_BUCKET_COLUMN), BUCKET_NUM(
+ P_BUCKET_NUM);
private final String value;
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/util/DataRangeUtils.java
b/src/core-metadata/src/main/java/org/apache/kylin/util/DataRangeUtils.java
index ccbc6556ce..f23cba0205 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/util/DataRangeUtils.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/util/DataRangeUtils.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.metadata.model.PartitionDesc;
public final class DataRangeUtils {
@@ -169,13 +170,18 @@ public final class DataRangeUtils {
if (values == null || values.isEmpty()) {
return mergedRanges;
}
-
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat, Locale.ROOT);
try {
List<Date> dates = new ArrayList<>();
for (String value : values) {
+ if (StringUtils.isEmpty(value)) {
+ continue;
+ }
dates.add(sdf.parse(value));
}
+ if (dates.isEmpty()) {
+ return Lists.newArrayList();
+ }
// Sort the dates
dates.sort(Date::compareTo);
Date start = dates.get(0);
diff --git
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
index ddf6635482..24080785fe 100644
---
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
+++
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
@@ -84,6 +84,8 @@ public class ExecutableResponse implements
Comparable<ExecutableResponse> {
private ExecutableState schedulerState;
@JsonProperty("job_name")
private String jobName;
+ @JsonProperty("data_range_partitions")
+ private String dataRangePartitions;
@JsonProperty("data_range_start")
private long dataRangeStart;
@JsonProperty("data_range_end")
@@ -202,9 +204,12 @@ public class ExecutableResponse implements
Comparable<ExecutableResponse> {
}
} else if (abstractExecutable instanceof InternalTableLoadingJob) {
InternalTableLoadingJob internalTableJob =
(InternalTableLoadingJob) abstractExecutable;
- if ("false".equals(internalTableJob.getParam("incrementalBuild"))
- ||
"true".equals(internalTableJob.getParam("deletePartition"))) {
+ String partitionValues =
internalTableJob.getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES);
+ if ("false".equals(internalTableJob.getParam("incrementalBuild")))
{
executableResponse.setDataRangeEnd(Long.MAX_VALUE);
+ } else if (StringUtils.isNotEmpty(partitionValues)) {
+ partitionValues = partitionValues.replace("[",
"").replace("]", "");
+ executableResponse.setDataRangePartitions(partitionValues);
} else {
executableResponse.setDataRangeStart(Long.parseLong(internalTableJob.getParam("startTime")));
executableResponse.setDataRangeEnd(Long.parseLong(internalTableJob.getParam("endTime")));
@@ -288,8 +293,7 @@ public class ExecutableResponse implements
Comparable<ExecutableResponse> {
/** calculate stage count from segment */
public static double calculateSuccessStageInTaskMap(AbstractExecutable
task,
- Map<String, List<StageExecutable>> stageMap,
- ExecutablePO executablePO) {
+ Map<String, List<StageExecutable>> stageMap, ExecutablePO
executablePO) {
var successStages = 0D;
boolean calculateIndexExecRadio = stageMap.size() == 1;
for (Map.Entry<String, List<StageExecutable>> entry :
stageMap.entrySet()) {
@@ -301,8 +305,7 @@ public class ExecutableResponse implements
Comparable<ExecutableResponse> {
}
public static double calculateSuccessStage(AbstractExecutable task, String
segmentId,
- List<StageExecutable> stageExecutables,
- boolean calculateIndexExecRadio, ExecutablePO executablePO) {
+ List<StageExecutable> stageExecutables, boolean
calculateIndexExecRadio, ExecutablePO executablePO) {
var successStages = 0D;
for (StageExecutable stage : stageExecutables) {
if (ExecutableState.SUCCEED == stage.getStatusInMem(segmentId)
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
index 3cda8cac17..7b3aca4def 100644
---
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
@@ -551,6 +551,13 @@ public class InternalTableServiceTest extends
AbstractTestCase {
ExecutableState state =
executableManager.getJob(finalJobId3).getStatus();
return state.isFinalState() || state == ExecutableState.ERROR;
});
+
+ // test cancel a full load job
+ internalTableService.truncateInternalTable(PROJECT, TABLE_INDENTITY);
+ jobId = internalTableService
+ .loadIntoInternalTable(PROJECT, table.getName(),
table.getDatabase(), false, false, "", "", null, null)
+ .getJobs().get(0).getJobId();
+ executableManager.discardJob(jobId);
}
@Test
@@ -578,6 +585,7 @@ public class InternalTableServiceTest extends
AbstractTestCase {
void testRefreshPartitions() throws Exception {
KylinConfig config = KylinConfig.getInstanceFromEnv();
NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
+ ExecutableManager executableManager =
ExecutableManager.getInstance(config, PROJECT);
TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
internalTableService.createInternalTable(PROJECT, TABLE_INDENTITY, new
String[] { PARTITION_COL }, null,
new HashMap<>(), InternalTableDesc.StorageType.PARQUET.name());
@@ -593,8 +601,9 @@ public class InternalTableServiceTest extends
AbstractTestCase {
when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyyMM");
internalTableService.updateInternalTable(PROJECT, table.getName(),
table.getDatabase(),
new String[] { DATE_COL }, "yyyyMM", new HashMap<>(),
InternalTableDesc.StorageType.PARQUET.name());
- internalTableService.loadIntoInternalTable(PROJECT, table.getName(),
table.getDatabase(), false, true, "", "",
- new String[] { "199201", "199203" }, null);
+ String jobId = internalTableService.loadIntoInternalTable(PROJECT,
table.getName(), table.getDatabase(), false,
+ true, "", "", new String[] { "199201", "199203" },
null).getJobs().get(0).getJobId();
+ executableManager.discardJob(jobId);
}
@Test
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index a697d9c66a..f0f6a8ded5 100644
---
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -1009,11 +1009,6 @@ public class JobServiceTest extends
NLocalFileMetadataTestCase {
response = ExecutableResponse.create(job, null);
Assert.assertEquals(Long.MAX_VALUE, response.getDataRangeEnd());
- job.setParam("incrementalBuild", "true");
- job.setParam("deletePartition", "true");
- response = ExecutableResponse.create(job, null);
- Assert.assertEquals(Long.MAX_VALUE, response.getDataRangeEnd());
-
TableDesc originTable =
manager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
internalManager.createInternalTable(new
InternalTableDesc(originTable));
diff --git
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableLoadingService.java
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableLoadingService.java
index ef42c303b1..884f26bb70 100644
---
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableLoadingService.java
+++
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableLoadingService.java
@@ -27,6 +27,8 @@ import static
org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_REFRESH;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
@@ -39,6 +41,7 @@ import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.engine.spark.builder.InternalTableLoader;
import org.apache.kylin.engine.spark.job.InternalTableLoadJob;
import org.apache.kylin.engine.spark.job.InternalTableUpdateMetadataStep;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.dao.JobStatisticsManager;
import org.apache.kylin.job.execution.JobTypeEnum;
@@ -75,28 +78,22 @@ public class InternalTableLoadingService extends
BasicService {
jobStatisticsManager.updateStatistics(TimeUtil.getDayStart(System.currentTimeMillis()),
0, 0, 1);
InternalTableDesc internalTable =
checkAndGetInternalTables(project, table, database);
InternalTableManager internalTableManager =
InternalTableManager.getInstance(getConfig(), project);
- // refresh partitions
- if (isRefresh && null != partitions && partitions.length > 0) {
- SparkSession ss = SparkSession.getDefaultSession().get();
- InternalTableLoader internalTableLoader = new
InternalTableLoader();
- internalTableLoader.loadInternalTable(ss, internalTable, new
String[] { startDate, endDate },
- partitions,
internalTable.getStorageType().getFormat(), isIncremental);
- } else {
- checkBeforeSubmit(internalTable, jobType, isIncremental,
isRefresh, startDate, endDate);
- logger.info(
- "create internal table loading job for table: {},
isIncrementBuild: {}, startTime: {}, endTime: {}",
- internalTable.getIdentity(), isIncremental, startDate,
endDate);
- JobParam jobParam = new
JobParam().withProject(project).withTable(internalTable.getIdentity())
-
.withYarnQueue(yarnQueue).withJobTypeEnum(jobType).withOwner(BasicService.getUsername())
- .addExtParams(NBatchConstants.P_INCREMENTAL_BUILD,
String.valueOf(isIncremental))
- .addExtParams(NBatchConstants.P_OUTPUT_MODE,
String.valueOf(isRefresh))
- .addExtParams(NBatchConstants.P_START_DATE, startDate)
- .addExtParams(NBatchConstants.P_END_DATE, endDate);
- String jobId =
getManager(SourceUsageManager.class).licenseCheckWrap(project,
- () -> getManager(JobManager.class,
project).addJob(jobParam));
- jobIds.add(jobId);
- internalTableManager.saveOrUpdateInternalTable(internalTable);
- }
+ checkBeforeSubmit(internalTable, isIncremental, isRefresh,
startDate, endDate, partitions);
+ logger.info(
+ "create internal table loading job for table: {},
isIncrementBuild: {}, startTime: {}, endTime: {}",
+ internalTable.getIdentity(), isIncremental, startDate,
endDate);
+ String partitionValues = null == partitions ? "" :
Arrays.toString(partitions);
+ JobParam jobParam = new
JobParam().withProject(project).withTable(internalTable.getIdentity())
+
.withYarnQueue(yarnQueue).withJobTypeEnum(jobType).withOwner(BasicService.getUsername())
+ .addExtParams(NBatchConstants.P_INCREMENTAL_BUILD,
String.valueOf(isIncremental))
+ .addExtParams(NBatchConstants.P_OUTPUT_MODE,
String.valueOf(isRefresh))
+ .addExtParams(NBatchConstants.P_START_DATE, startDate)
+ .addExtParams(NBatchConstants.P_END_DATE, endDate)
+ .addExtParams(NBatchConstants.P_REFRESH_PARTITION_VALUES,
partitionValues);
+ String jobId =
getManager(SourceUsageManager.class).licenseCheckWrap(project,
+ () -> getManager(JobManager.class,
project).addJob(jobParam));
+ jobIds.add(jobId);
+ internalTableManager.saveOrUpdateInternalTable(internalTable);
return true;
}, project);
String jobName = isRefresh ? INTERNAL_TABLE_REFRESH.toString() :
INTERNAL_TABLE_BUILD.toString();
@@ -108,42 +105,53 @@ public class InternalTableLoadingService extends
BasicService {
* @param startDate
* @param endDate
*/
- private void checkBeforeSubmit(InternalTableDesc internalTable,
JobTypeEnum jobType, boolean isIncremental,
- boolean isRefresh, String startDate, String endDate) throws
KylinException {
+ private void checkBeforeSubmit(InternalTableDesc internalTable, boolean
isIncremental, boolean isRefresh,
+ String startDate, String endDate, String[] partitions) throws
Exception {
if (isIncremental && (Objects.isNull(internalTable.getTablePartition())
||
Objects.isNull(internalTable.getTablePartition().getPartitionColumns())
||
internalTable.getTablePartition().getPartitionColumns().length == 0)) {
String errorMsg = String.format(Locale.ROOT,
MsgPicker.getMsg().getInternalTableUnpartitioned());
throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
}
+ partitions = null == partitions ? new String[] {} : partitions;
// check job_range overlap?
InternalTablePartition tablePartition =
internalTable.getTablePartition();
List<String[]> jobRange = internalTable.getJobRange();
- String[] curJobRange = new String[] { "0", "0" };
+ List<String[]> curRange = Lists.newArrayList();
+ if (!isIncremental) {
+ curRange.add(new String[] { "0", "0" });
+ }
String timeFmt = Objects.isNull(tablePartition) ? "" :
tablePartition.getDatePartitionFormat();
+ if (isRefresh && partitions.length > 0 &&
StringUtils.isNotEmpty(timeFmt)) {
+
curRange.addAll(DataRangeUtils.mergeTimeRange(Arrays.asList(partitions),
timeFmt));
+ }
if (StringUtils.isNotEmpty(startDate) &&
StringUtils.isNotEmpty(timeFmt)) {
SimpleDateFormat fmt = new SimpleDateFormat(timeFmt, Locale.ROOT);
- String start = StringUtils.isEmpty(startDate) ? "0" :
fmt.format(Long.parseLong(startDate));
- String end = StringUtils.isEmpty(endDate) ? "0" :
fmt.format(Long.parseLong(endDate));
- curJobRange = new String[] { start, end };
+ String start = fmt.format(Long.parseLong(startDate));
+ String end = fmt.format(Long.parseLong(endDate));
+ curRange.add(new String[] { start, end });
}
// non-time partition table can not submit incremental job
- if (isIncremental && !Objects.isNull(tablePartition)
+ if (isIncremental && !Objects.isNull(tablePartition) &&
StringUtils.isNotEmpty(startDate)
&&
StringUtils.isEmpty(tablePartition.getDatePartitionFormat())) {
String errorMsg = String.format(Locale.ROOT,
MsgPicker.getMsg().getNonTimeInternalTableIncrementalBuild());
throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
}
- if (DataRangeUtils.timeOverlap(internalTable.getJobRange(),
curJobRange, timeFmt)) {
- String errorMsg = String.format(Locale.ROOT,
MsgPicker.getMsg().getTimeRangeOverlap());
- throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
- }
- // check refresh out of data range
- if (isRefresh && !DataRangeUtils.timeInRange(curJobRange,
internalTable.getPartitionRange(), timeFmt)) {
- String errorMsg = String.format(Locale.ROOT,
MsgPicker.getMsg().getTimeOutOfRange());
- throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
- }
- logger.info(jobType.getCategory());
- jobRange.add(curJobRange);
+ String[] finalPartitions = partitions;
+ curRange.forEach(range -> {
+ if (DataRangeUtils.timeOverlap(internalTable.getJobRange(), range,
timeFmt)) {
+ String errorMsg = String.format(Locale.ROOT,
MsgPicker.getMsg().getTimeRangeOverlap());
+ throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
+ }
+ // check refresh out of data range(exclude specify partitions)
+ if (isRefresh && finalPartitions.length == 0
+ && !DataRangeUtils.timeInRange(range,
internalTable.getPartitionRange(), timeFmt)) {
+ String errorMsg = String.format(Locale.ROOT,
MsgPicker.getMsg().getInternalTableUnpartitioned());
+ throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
+ }
+ });
+ jobRange.addAll(curRange);
+ jobRange.sort(Comparator.comparing(valueA -> valueA[0]));
internalTable.setJobRange(jobRange);
}
@@ -168,6 +176,11 @@ public class InternalTableLoadingService extends
BasicService {
tablePartition.setPartitionValues(info.getPartitionValues());
tablePartition.setPartitionDetails(info.getPartitionDetails());
oldTable.setRowCount(info.getFinalCount());
+ if
(StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())) {
+ List<String[]> partitionRange =
DataRangeUtils.mergeTimeRange(tablePartition.getPartitionValues(),
+ tablePartition.getDatePartitionFormat());
+ oldTable.setPartitionRange(partitionRange);
+ }
internalTableManager.saveOrUpdateInternalTable(oldTable);
return true;
}, project);
diff --git
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
index 4496ac5a0f..33520a4e86 100644
---
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
+++
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
@@ -406,6 +406,9 @@ public class InternalTableService extends BasicService {
if (isIncremental) {
DataRangeUtils.validateRange(startDate, endDate);
}
+ if (null != partitions && partitions.length > 0) {
+ isIncremental = true;
+ }
// treat full refresh as full load processing.
if (!isIncremental && isRefresh) {
isRefresh = false;
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
index 10f8b4562e..a6347ae760 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
@@ -72,9 +72,15 @@ public class InternalTableLoadJob extends SparkApplication {
boolean incrementalBuild =
"true".equals(getParam(NBatchConstants.P_INCREMENTAL_BUILD));
String startDate = getParam(NBatchConstants.P_START_DATE);
String endDate = getParam(NBatchConstants.P_END_DATE);
+ String refreshPartitions =
getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES).replace("[",
"").replace("]",
+ "");
+ String[] partitions = new String[] {};
+ if (StringUtils.isNotEmpty(refreshPartitions)) {
+ partitions = StringUtils.isEmpty(refreshPartitions) ? new String[]
{} : refreshPartitions.split(", ");
+ }
String storagePolicy = config.getGlutenStoragePolicy();
InternalTableLoader loader = new InternalTableLoader();
- loader.loadInternalTable(ss, internalTable, new String[] { startDate,
endDate }, null, storagePolicy,
+ loader.loadInternalTable(ss, internalTable, new String[] { startDate,
endDate }, partitions, storagePolicy,
incrementalBuild);
}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
index efb331e09a..c73a9d7f44 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
@@ -18,9 +18,7 @@
package org.apache.kylin.engine.spark.job;
-import java.text.SimpleDateFormat;
import java.util.List;
-import java.util.Locale;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -74,6 +72,7 @@ public class InternalTableLoadingJob extends
DefaultExecutableOnTable {
job.setParam(NBatchConstants.P_END_DATE, param.getEndDate());
job.setParam(NBatchConstants.P_DELETE_PARTITION_VALUES,
param.getDeletePartitionValues());
job.setParam(NBatchConstants.P_DELETE_PARTITION,
param.getDeletePartition());
+ job.setParam(NBatchConstants.P_REFRESH_PARTITION_VALUES,
param.getRefreshPartitionValues());
KylinConfig config = KylinConfig.getInstanceFromEnv();
StepEnum.BUILD_INTERNAL.create(job, config);
StepEnum.UPDATE_METADATA.create(job, config);
@@ -114,15 +113,10 @@ public class InternalTableLoadingJob extends
DefaultExecutableOnTable {
String tableName = getParam(NBatchConstants.P_TABLE_NAME);
String startDate = getParam(NBatchConstants.P_START_DATE);
String endDate = getParam(NBatchConstants.P_END_DATE);
+ String refreshPartitions =
getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES).replace("[", "")
+ .replace("]", "");
+ boolean isIncremental =
"true".equals(getParam(NBatchConstants.P_INCREMENTAL_BUILD));
InternalTablePartition tablePartition =
internalTable.getTablePartition();
- // release current job_range
- String[] curJobRange = new String[] { "0", "0" };
- if (null != tablePartition &&
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())
- && StringUtils.isNotEmpty(startDate)) {
- SimpleDateFormat fmt = new
SimpleDateFormat(tablePartition.getDatePartitionFormat(), Locale.ROOT);
- curJobRange = new String[] {
fmt.format(Long.parseLong(startDate)),
- fmt.format(Long.parseLong(endDate)) };
- }
// merge latest partition_range
logger.info("starting merging delta partitions for internal table
{}", tableName);
if (null != tablePartition &&
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())) {
@@ -130,12 +124,11 @@ public class InternalTableLoadingJob extends
DefaultExecutableOnTable {
tablePartition.getDatePartitionFormat());
internalTable.setPartitionRange(partitionRange);
}
- List<String[]> jobRange = internalTable.getJobRange();
- String[] finalCurJobRange = curJobRange;
- jobRange.removeIf(rang -> rang[0].equals(finalCurJobRange[0]) &&
rang[1].equals(finalCurJobRange[1]));
- internalTable.setJobRange(jobRange);
+ // release current job_range
+ InternalTableUpdateMetadataStep metadataStep = new
InternalTableUpdateMetadataStep();
+ metadataStep.releaseJobRange(internalTable, isIncremental,
startDate, endDate, refreshPartitions);
internalTableManager.saveOrUpdateInternalTable(internalTable);
- logger.info("release job_range for internal table {} , range {}.",
tableName, jobRange);
+ logger.info("release job_range for internal table {} ", tableName);
return true;
}, getProject());
}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableUpdateMetadataStep.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableUpdateMetadataStep.java
index 46c3c2065a..abf20ba3a7 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableUpdateMetadataStep.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableUpdateMetadataStep.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.spark.job;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.builder.InternalTableLoader;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.job.JobContext;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
@@ -92,6 +94,9 @@ public class InternalTableUpdateMetadataStep extends
AbstractExecutable {
String project = getParam(NBatchConstants.P_PROJECT_NAME);
String startDate = getParam(NBatchConstants.P_START_DATE);
String endDate = getParam(NBatchConstants.P_END_DATE);
+ String refreshPartitions =
getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES).replace("[", "")
+ .replace("]", "");
+ boolean isIncremental =
"true".equals(getParam(NBatchConstants.P_INCREMENTAL_BUILD));
// fetch delta partition info
InternalTableManager internalTableManager =
InternalTableManager.getInstance(config, project);
InternalTableDesc internalTable =
internalTableManager.getInternalTableDesc(tableName);
@@ -109,21 +114,7 @@ public class InternalTableUpdateMetadataStep extends
AbstractExecutable {
tablePartition.getDatePartitionFormat());
internalTable.setPartitionRange(partitionRange);
}
- // release current job_range
- String[] curJobRange = new String[] { "0", "0" };
- if (null != tablePartition &&
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())
- && StringUtils.isNotEmpty(startDate)) {
- SimpleDateFormat fmt = new
SimpleDateFormat(tablePartition.getDatePartitionFormat(), Locale.ROOT);
- curJobRange = new String[] {
fmt.format(Long.parseLong(startDate)),
- fmt.format(Long.parseLong(endDate)) };
- }
- List<String[]> jobRange = internalTable.getJobRange();
- String[] finalCurJobRange = curJobRange;
- jobRange.removeIf(rang -> rang[0].equals(finalCurJobRange[0]) &&
rang[1].equals(finalCurJobRange[1]));
- internalTable.setJobRange(jobRange);
- logger.info("trying to release job_range for internal table {} ,
range {}.",
- internalTable.getTableDesc().getTableAlias(), jobRange);
-
+ releaseJobRange(internalTable, isIncremental, startDate, endDate,
refreshPartitions);
internalTableManager.saveOrUpdateInternalTable(internalTable);
logger.info("update metadata for internal table {} cost: {} ms.",
internalTable.getTableDesc().getTableAlias(),
(System.currentTimeMillis() - startTime));
@@ -131,6 +122,36 @@ public class InternalTableUpdateMetadataStep extends
AbstractExecutable {
}, project);
}
+ public void releaseJobRange(InternalTableDesc internalTable, boolean
isIncremental, String startDate,
+ String endDate, String refreshPartitions) throws Exception {
+ InternalTablePartition tablePartition =
internalTable.getTablePartition();
+ // release current job_range
+ logger.info("starting releasing job_range");
+ List<String[]> curRange = Lists.newArrayList();
+ if (!isIncremental) {
+ curRange.add(new String[] { "0", "0" });
+ }
+ if (StringUtils.isNotEmpty(refreshPartitions)
+ &&
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())) {
+ String[] partitions = refreshPartitions.split(", ");
+ curRange.addAll(
+ DataRangeUtils.mergeTimeRange(Arrays.asList(partitions),
tablePartition.getDatePartitionFormat()));
+ }
+ if (null != tablePartition &&
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())
+ && StringUtils.isNotEmpty(startDate)) {
+ SimpleDateFormat fmt = new
SimpleDateFormat(tablePartition.getDatePartitionFormat(), Locale.ROOT);
+ curRange.add(new String[] { fmt.format(Long.parseLong(startDate)),
fmt.format(Long.parseLong(endDate)) });
+ }
+ List<String[]> jobRange = internalTable.getJobRange();
+ curRange.forEach(curJobRange -> {
+ jobRange.removeIf(range -> range[0].equals(curJobRange[0]) &&
range[1].equals(curJobRange[1]));
+ });
+ internalTable.setJobRange(jobRange);
+ logger.info("trying to release job_range for internal table {} , range
{}.",
+ internalTable.getTableDesc().getTableAlias(), jobRange);
+
+ }
+
public InternalTableLoadJob.InternalTableMetaUpdateInfo
extractUpdateInfo(String project, String tableName,
KylinConfig config, SparkSession ss) {
InternalTableManager internalTableManager =
InternalTableManager.getInstance(config, project);
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
index 218b672dbf..d2927a63e3 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
@@ -25,7 +25,7 @@ import java.util.Locale
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.kylin.common.exception.{CommonErrorCode, KylinException}
-import org.apache.kylin.common.util.{DateFormat, HadoopUtil}
+import org.apache.kylin.common.util.HadoopUtil
import org.apache.kylin.engine.spark.utils.SparkDataSource._
import org.apache.kylin.metadata.cube.model.NBatchConstants
import org.apache.kylin.metadata.table.InternalTableDesc
@@ -102,9 +102,6 @@ class InternalTableLoader extends Logging {
}
val format = table.getStorageType.getFormat
if (incremental) {
- val dateFormat = table.getTablePartition.getDatePartitionFormat
- logInfo(f"Refresh dynamic partitions
[${DateFormat.formatToDateStr(startDate.toLong, dateFormat)}," +
- f" ${DateFormat.formatToDateStr(endDate.toLong, dateFormat)})")
ss.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
}
writer.format(format).mode(outPutMode).save(location)