This is an automated email from the ASF dual-hosted git repository.

jlfsdtc pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new abdc910760 KYLIN-6078 submit jobs when refresh specify partitions
abdc910760 is described below

commit abdc9107606844e56d1581079a0ec3a62f04e953
Author: jlf <[email protected]>
AuthorDate: Mon Sep 15 10:56:03 2025 +0800

    KYLIN-6078 submit jobs when refresh specify partitions
---
 .../rest/request/InternalTableBuildRequest.java    |  2 +-
 .../kylin/job/handler/InternalTableJobHandler.java |  2 +
 .../kylin/metadata/cube/model/NBatchConstants.java |  8 +-
 .../java/org/apache/kylin/util/DataRangeUtils.java |  8 +-
 .../kylin/rest/response/ExecutableResponse.java    | 15 ++--
 .../rest/service/InternalTableServiceTest.java     | 13 ++-
 .../apache/kylin/rest/service/JobServiceTest.java  |  5 --
 .../rest/service/InternalTableLoadingService.java  | 93 ++++++++++++----------
 .../kylin/rest/service/InternalTableService.java   |  3 +
 .../engine/spark/job/InternalTableLoadJob.java     |  8 +-
 .../engine/spark/job/InternalTableLoadingJob.java  | 23 ++----
 .../spark/job/InternalTableUpdateMetadataStep.java | 51 ++++++++----
 .../engine/spark/builder/InternalTableLoader.scala |  5 +-
 13 files changed, 141 insertions(+), 95 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/request/InternalTableBuildRequest.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/request/InternalTableBuildRequest.java
index 434f1bdbaf..49359e817c 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/request/InternalTableBuildRequest.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/request/InternalTableBuildRequest.java
@@ -41,6 +41,6 @@ public class InternalTableBuildRequest {
     private String yarnQueue;
 
     @JsonProperty("partitions")
-    private String[] partitions;
+    private String[] partitions = new String[] {};
 
 }
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
index d127ccb76c..908f4f8c75 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
@@ -39,6 +39,7 @@ public class InternalTableJobHandler extends 
AbstractJobHandler {
         private String endDate;
         private String deletePartitionValues;
         private String deletePartition; // true or false
+        private String refreshPartitionValues;
 
         public InternalTableJobBuildParam(JobParam jobParam) {
             super(null, null, jobParam.getOwner(), jobParam.getJobTypeEnum(), 
jobParam.getJobId(), null, null, null,
@@ -51,6 +52,7 @@ public class InternalTableJobHandler extends 
AbstractJobHandler {
             this.endDate = 
jobParam.getExtParams().get(NBatchConstants.P_END_DATE);
             this.deletePartitionValues = 
jobParam.getExtParams().get(NBatchConstants.P_DELETE_PARTITION_VALUES);
             this.deletePartition = 
jobParam.getExtParams().get(NBatchConstants.P_DELETE_PARTITION);
+            this.refreshPartitionValues = 
jobParam.getExtParams().get(NBatchConstants.P_REFRESH_PARTITION_VALUES);
         }
     }
 
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
index fab3fab21a..ea34305552 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java
@@ -76,7 +76,7 @@ public interface NBatchConstants {
     String P_DELETE_PARTITION = "deletePartition";
     String P_SORT_BY_PARTITION_BEFORE_SAVE = "sortByPartition";
     String P_PRELOADED_CACHE = "preloadedCache";
-
+    String P_REFRESH_PARTITION_VALUES = "refreshPartitionValues";
 
     /** index planner job parameters */
     String P_PLANNER_INITIALIZE_CUBOID_COUNT = 
"kylin.planner.initializeCuboidCount";
@@ -92,10 +92,8 @@ public interface NBatchConstants {
 
     @Getter
     enum TblPropertyKey {
-        PRIMARY_KEY(P_PRIMARY_KEY),
-        ORDER_BY_KEY(P_ORDER_BY_KEY),
-        BUCKET_COLUMN(P_BUCKET_COLUMN),
-        BUCKET_NUM(P_BUCKET_NUM);
+        PRIMARY_KEY(P_PRIMARY_KEY), ORDER_BY_KEY(P_ORDER_BY_KEY), 
BUCKET_COLUMN(P_BUCKET_COLUMN), BUCKET_NUM(
+                P_BUCKET_NUM);
 
         private final String value;
 
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/util/DataRangeUtils.java 
b/src/core-metadata/src/main/java/org/apache/kylin/util/DataRangeUtils.java
index ccbc6556ce..f23cba0205 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/util/DataRangeUtils.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/util/DataRangeUtils.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.metadata.model.PartitionDesc;
 
 public final class DataRangeUtils {
@@ -169,13 +170,18 @@ public final class DataRangeUtils {
         if (values == null || values.isEmpty()) {
             return mergedRanges;
         }
-
         SimpleDateFormat sdf = new SimpleDateFormat(dateFormat, Locale.ROOT);
         try {
             List<Date> dates = new ArrayList<>();
             for (String value : values) {
+                if (StringUtils.isEmpty(value)) {
+                    continue;
+                }
                 dates.add(sdf.parse(value));
             }
+            if (dates.isEmpty()) {
+                return Lists.newArrayList();
+            }
             // Sort the dates
             dates.sort(Date::compareTo);
             Date start = dates.get(0);
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
index ddf6635482..24080785fe 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
@@ -84,6 +84,8 @@ public class ExecutableResponse implements 
Comparable<ExecutableResponse> {
     private ExecutableState schedulerState;
     @JsonProperty("job_name")
     private String jobName;
+    @JsonProperty("data_range_partitions")
+    private String dataRangePartitions;
     @JsonProperty("data_range_start")
     private long dataRangeStart;
     @JsonProperty("data_range_end")
@@ -202,9 +204,12 @@ public class ExecutableResponse implements 
Comparable<ExecutableResponse> {
             }
         } else if (abstractExecutable instanceof InternalTableLoadingJob) {
             InternalTableLoadingJob internalTableJob = 
(InternalTableLoadingJob) abstractExecutable;
-            if ("false".equals(internalTableJob.getParam("incrementalBuild"))
-                    || 
"true".equals(internalTableJob.getParam("deletePartition"))) {
+            String partitionValues = 
internalTableJob.getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES);
+            if ("false".equals(internalTableJob.getParam("incrementalBuild"))) 
{
                 executableResponse.setDataRangeEnd(Long.MAX_VALUE);
+            } else if (StringUtils.isNotEmpty(partitionValues)) {
+                partitionValues = partitionValues.replace("[", 
"").replace("]", "");
+                executableResponse.setDataRangePartitions(partitionValues);
             } else {
                 
executableResponse.setDataRangeStart(Long.parseLong(internalTableJob.getParam("startTime")));
                 
executableResponse.setDataRangeEnd(Long.parseLong(internalTableJob.getParam("endTime")));
@@ -288,8 +293,7 @@ public class ExecutableResponse implements 
Comparable<ExecutableResponse> {
 
     /** calculate stage count from segment */
     public static double calculateSuccessStageInTaskMap(AbstractExecutable 
task,
-            Map<String, List<StageExecutable>> stageMap,
-            ExecutablePO executablePO) {
+            Map<String, List<StageExecutable>> stageMap, ExecutablePO 
executablePO) {
         var successStages = 0D;
         boolean calculateIndexExecRadio = stageMap.size() == 1;
         for (Map.Entry<String, List<StageExecutable>> entry : 
stageMap.entrySet()) {
@@ -301,8 +305,7 @@ public class ExecutableResponse implements 
Comparable<ExecutableResponse> {
     }
 
     public static double calculateSuccessStage(AbstractExecutable task, String 
segmentId,
-            List<StageExecutable> stageExecutables,
-            boolean calculateIndexExecRadio, ExecutablePO executablePO) {
+            List<StageExecutable> stageExecutables, boolean 
calculateIndexExecRadio, ExecutablePO executablePO) {
         var successStages = 0D;
         for (StageExecutable stage : stageExecutables) {
             if (ExecutableState.SUCCEED == stage.getStatusInMem(segmentId)
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
index 3cda8cac17..7b3aca4def 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
@@ -551,6 +551,13 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
             ExecutableState state = 
executableManager.getJob(finalJobId3).getStatus();
             return state.isFinalState() || state == ExecutableState.ERROR;
         });
+
+        // test cancel a full load job
+        internalTableService.truncateInternalTable(PROJECT, TABLE_INDENTITY);
+        jobId = internalTableService
+                .loadIntoInternalTable(PROJECT, table.getName(), 
table.getDatabase(), false, false, "", "", null, null)
+                .getJobs().get(0).getJobId();
+        executableManager.discardJob(jobId);
     }
 
     @Test
@@ -578,6 +585,7 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
     void testRefreshPartitions() throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
+        ExecutableManager executableManager = 
ExecutableManager.getInstance(config, PROJECT);
         TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
         internalTableService.createInternalTable(PROJECT, TABLE_INDENTITY, new 
String[] { PARTITION_COL }, null,
                 new HashMap<>(), InternalTableDesc.StorageType.PARQUET.name());
@@ -593,8 +601,9 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
         when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyyMM");
         internalTableService.updateInternalTable(PROJECT, table.getName(), 
table.getDatabase(),
                 new String[] { DATE_COL }, "yyyyMM", new HashMap<>(), 
InternalTableDesc.StorageType.PARQUET.name());
-        internalTableService.loadIntoInternalTable(PROJECT, table.getName(), 
table.getDatabase(), false, true, "", "",
-                new String[] { "199201", "199203" }, null);
+        String jobId = internalTableService.loadIntoInternalTable(PROJECT, 
table.getName(), table.getDatabase(), false,
+                true, "", "", new String[] { "199201", "199203" }, 
null).getJobs().get(0).getJobId();
+        executableManager.discardJob(jobId);
     }
 
     @Test
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index a697d9c66a..f0f6a8ded5 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -1009,11 +1009,6 @@ public class JobServiceTest extends 
NLocalFileMetadataTestCase {
         response = ExecutableResponse.create(job, null);
         Assert.assertEquals(Long.MAX_VALUE, response.getDataRangeEnd());
 
-        job.setParam("incrementalBuild", "true");
-        job.setParam("deletePartition", "true");
-        response = ExecutableResponse.create(job, null);
-        Assert.assertEquals(Long.MAX_VALUE, response.getDataRangeEnd());
-
         TableDesc originTable = 
manager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
         internalManager.createInternalTable(new 
InternalTableDesc(originTable));
 
diff --git 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableLoadingService.java
 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableLoadingService.java
index ef42c303b1..884f26bb70 100644
--- 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableLoadingService.java
+++ 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableLoadingService.java
@@ -27,6 +27,8 @@ import static 
org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_REFRESH;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
@@ -39,6 +41,7 @@ import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.engine.spark.builder.InternalTableLoader;
 import org.apache.kylin.engine.spark.job.InternalTableLoadJob;
 import org.apache.kylin.engine.spark.job.InternalTableUpdateMetadataStep;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.job.dao.JobStatisticsManager;
 import org.apache.kylin.job.execution.JobTypeEnum;
@@ -75,28 +78,22 @@ public class InternalTableLoadingService extends 
BasicService {
             
jobStatisticsManager.updateStatistics(TimeUtil.getDayStart(System.currentTimeMillis()),
 0, 0, 1);
             InternalTableDesc internalTable = 
checkAndGetInternalTables(project, table, database);
             InternalTableManager internalTableManager = 
InternalTableManager.getInstance(getConfig(), project);
-            // refresh partitions
-            if (isRefresh && null != partitions && partitions.length > 0) {
-                SparkSession ss = SparkSession.getDefaultSession().get();
-                InternalTableLoader internalTableLoader = new 
InternalTableLoader();
-                internalTableLoader.loadInternalTable(ss, internalTable, new 
String[] { startDate, endDate },
-                        partitions, 
internalTable.getStorageType().getFormat(), isIncremental);
-            } else {
-                checkBeforeSubmit(internalTable, jobType, isIncremental, 
isRefresh, startDate, endDate);
-                logger.info(
-                        "create internal table loading job for table: {}, 
isIncrementBuild: {}, startTime: {}, endTime: {}",
-                        internalTable.getIdentity(), isIncremental, startDate, 
endDate);
-                JobParam jobParam = new 
JobParam().withProject(project).withTable(internalTable.getIdentity())
-                        
.withYarnQueue(yarnQueue).withJobTypeEnum(jobType).withOwner(BasicService.getUsername())
-                        .addExtParams(NBatchConstants.P_INCREMENTAL_BUILD, 
String.valueOf(isIncremental))
-                        .addExtParams(NBatchConstants.P_OUTPUT_MODE, 
String.valueOf(isRefresh))
-                        .addExtParams(NBatchConstants.P_START_DATE, startDate)
-                        .addExtParams(NBatchConstants.P_END_DATE, endDate);
-                String jobId = 
getManager(SourceUsageManager.class).licenseCheckWrap(project,
-                        () -> getManager(JobManager.class, 
project).addJob(jobParam));
-                jobIds.add(jobId);
-                internalTableManager.saveOrUpdateInternalTable(internalTable);
-            }
+            checkBeforeSubmit(internalTable, isIncremental, isRefresh, 
startDate, endDate, partitions);
+            logger.info(
+                    "create internal table loading job for table: {}, 
isIncrementBuild: {}, startTime: {}, endTime: {}",
+                    internalTable.getIdentity(), isIncremental, startDate, 
endDate);
+            String partitionValues = null == partitions ? "" : 
Arrays.toString(partitions);
+            JobParam jobParam = new 
JobParam().withProject(project).withTable(internalTable.getIdentity())
+                    
.withYarnQueue(yarnQueue).withJobTypeEnum(jobType).withOwner(BasicService.getUsername())
+                    .addExtParams(NBatchConstants.P_INCREMENTAL_BUILD, 
String.valueOf(isIncremental))
+                    .addExtParams(NBatchConstants.P_OUTPUT_MODE, 
String.valueOf(isRefresh))
+                    .addExtParams(NBatchConstants.P_START_DATE, startDate)
+                    .addExtParams(NBatchConstants.P_END_DATE, endDate)
+                    .addExtParams(NBatchConstants.P_REFRESH_PARTITION_VALUES, 
partitionValues);
+            String jobId = 
getManager(SourceUsageManager.class).licenseCheckWrap(project,
+                    () -> getManager(JobManager.class, 
project).addJob(jobParam));
+            jobIds.add(jobId);
+            internalTableManager.saveOrUpdateInternalTable(internalTable);
             return true;
         }, project);
         String jobName = isRefresh ? INTERNAL_TABLE_REFRESH.toString() : 
INTERNAL_TABLE_BUILD.toString();
@@ -108,42 +105,53 @@ public class InternalTableLoadingService extends 
BasicService {
      * @param startDate
      * @param endDate
      */
-    private void checkBeforeSubmit(InternalTableDesc internalTable, 
JobTypeEnum jobType, boolean isIncremental,
-            boolean isRefresh, String startDate, String endDate) throws 
KylinException {
+    private void checkBeforeSubmit(InternalTableDesc internalTable, boolean 
isIncremental, boolean isRefresh,
+            String startDate, String endDate, String[] partitions) throws 
Exception {
         if (isIncremental && (Objects.isNull(internalTable.getTablePartition())
                 || 
Objects.isNull(internalTable.getTablePartition().getPartitionColumns())
                 || 
internalTable.getTablePartition().getPartitionColumns().length == 0)) {
             String errorMsg = String.format(Locale.ROOT, 
MsgPicker.getMsg().getInternalTableUnpartitioned());
             throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
         }
+        partitions = null == partitions ? new String[] {} : partitions;
         // check job_range overlap?
         InternalTablePartition tablePartition = 
internalTable.getTablePartition();
         List<String[]> jobRange = internalTable.getJobRange();
-        String[] curJobRange = new String[] { "0", "0" };
+        List<String[]> curRange = Lists.newArrayList();
+        if (!isIncremental) {
+            curRange.add(new String[] { "0", "0" });
+        }
         String timeFmt = Objects.isNull(tablePartition) ? "" : 
tablePartition.getDatePartitionFormat();
+        if (isRefresh && partitions.length > 0 && 
StringUtils.isNotEmpty(timeFmt)) {
+            
curRange.addAll(DataRangeUtils.mergeTimeRange(Arrays.asList(partitions), 
timeFmt));
+        }
         if (StringUtils.isNotEmpty(startDate) && 
StringUtils.isNotEmpty(timeFmt)) {
             SimpleDateFormat fmt = new SimpleDateFormat(timeFmt, Locale.ROOT);
-            String start = StringUtils.isEmpty(startDate) ? "0" : 
fmt.format(Long.parseLong(startDate));
-            String end = StringUtils.isEmpty(endDate) ? "0" : 
fmt.format(Long.parseLong(endDate));
-            curJobRange = new String[] { start, end };
+            String start = fmt.format(Long.parseLong(startDate));
+            String end = fmt.format(Long.parseLong(endDate));
+            curRange.add(new String[] { start, end });
         }
         // non-time partition table can not submit incremental job
-        if (isIncremental && !Objects.isNull(tablePartition)
+        if (isIncremental && !Objects.isNull(tablePartition) && 
StringUtils.isNotEmpty(startDate)
                 && 
StringUtils.isEmpty(tablePartition.getDatePartitionFormat())) {
             String errorMsg = String.format(Locale.ROOT, 
MsgPicker.getMsg().getNonTimeInternalTableIncrementalBuild());
             throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
         }
-        if (DataRangeUtils.timeOverlap(internalTable.getJobRange(), 
curJobRange, timeFmt)) {
-            String errorMsg = String.format(Locale.ROOT, 
MsgPicker.getMsg().getTimeRangeOverlap());
-            throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
-        }
-        // check refresh out of data range
-        if (isRefresh && !DataRangeUtils.timeInRange(curJobRange, 
internalTable.getPartitionRange(), timeFmt)) {
-            String errorMsg = String.format(Locale.ROOT, 
MsgPicker.getMsg().getTimeOutOfRange());
-            throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
-        }
-        logger.info(jobType.getCategory());
-        jobRange.add(curJobRange);
+        String[] finalPartitions = partitions;
+        curRange.forEach(range -> {
+            if (DataRangeUtils.timeOverlap(internalTable.getJobRange(), range, 
timeFmt)) {
+                String errorMsg = String.format(Locale.ROOT, 
MsgPicker.getMsg().getTimeRangeOverlap());
+                throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
+            }
+            // check refresh out of data range(exclude specify partitions)
+            if (isRefresh && finalPartitions.length == 0
+                    && !DataRangeUtils.timeInRange(range, 
internalTable.getPartitionRange(), timeFmt)) {
+                String errorMsg = String.format(Locale.ROOT, 
MsgPicker.getMsg().getInternalTableUnpartitioned());
+                throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
+            }
+        });
+        jobRange.addAll(curRange);
+        jobRange.sort(Comparator.comparing(valueA -> valueA[0]));
         internalTable.setJobRange(jobRange);
     }
 
@@ -168,6 +176,11 @@ public class InternalTableLoadingService extends 
BasicService {
             tablePartition.setPartitionValues(info.getPartitionValues());
             tablePartition.setPartitionDetails(info.getPartitionDetails());
             oldTable.setRowCount(info.getFinalCount());
+            if 
(StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())) {
+                List<String[]> partitionRange = 
DataRangeUtils.mergeTimeRange(tablePartition.getPartitionValues(),
+                        tablePartition.getDatePartitionFormat());
+                oldTable.setPartitionRange(partitionRange);
+            }
             internalTableManager.saveOrUpdateInternalTable(oldTable);
             return true;
         }, project);
diff --git 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
index 4496ac5a0f..33520a4e86 100644
--- 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
+++ 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
@@ -406,6 +406,9 @@ public class InternalTableService extends BasicService {
         if (isIncremental) {
             DataRangeUtils.validateRange(startDate, endDate);
         }
+        if (null != partitions && partitions.length > 0) {
+            isIncremental = true;
+        }
         // treat full refresh as full load processing.
         if (!isIncremental && isRefresh) {
             isRefresh = false;
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
index 10f8b4562e..a6347ae760 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
@@ -72,9 +72,15 @@ public class InternalTableLoadJob extends SparkApplication {
         boolean incrementalBuild = 
"true".equals(getParam(NBatchConstants.P_INCREMENTAL_BUILD));
         String startDate = getParam(NBatchConstants.P_START_DATE);
         String endDate = getParam(NBatchConstants.P_END_DATE);
+        String refreshPartitions = 
getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES).replace("[", 
"").replace("]",
+                "");
+        String[] partitions = new String[] {};
+        if (StringUtils.isNotEmpty(refreshPartitions)) {
+            partitions = StringUtils.isEmpty(refreshPartitions) ? new String[] 
{} : refreshPartitions.split(", ");
+        }
         String storagePolicy = config.getGlutenStoragePolicy();
         InternalTableLoader loader = new InternalTableLoader();
-        loader.loadInternalTable(ss, internalTable, new String[] { startDate, 
endDate }, null, storagePolicy,
+        loader.loadInternalTable(ss, internalTable, new String[] { startDate, 
endDate }, partitions, storagePolicy,
                 incrementalBuild);
     }
 
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
index efb331e09a..c73a9d7f44 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
@@ -18,9 +18,7 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import java.text.SimpleDateFormat;
 import java.util.List;
-import java.util.Locale;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -74,6 +72,7 @@ public class InternalTableLoadingJob extends 
DefaultExecutableOnTable {
         job.setParam(NBatchConstants.P_END_DATE, param.getEndDate());
         job.setParam(NBatchConstants.P_DELETE_PARTITION_VALUES, 
param.getDeletePartitionValues());
         job.setParam(NBatchConstants.P_DELETE_PARTITION, 
param.getDeletePartition());
+        job.setParam(NBatchConstants.P_REFRESH_PARTITION_VALUES, 
param.getRefreshPartitionValues());
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         StepEnum.BUILD_INTERNAL.create(job, config);
         StepEnum.UPDATE_METADATA.create(job, config);
@@ -114,15 +113,10 @@ public class InternalTableLoadingJob extends 
DefaultExecutableOnTable {
             String tableName = getParam(NBatchConstants.P_TABLE_NAME);
             String startDate = getParam(NBatchConstants.P_START_DATE);
             String endDate = getParam(NBatchConstants.P_END_DATE);
+            String refreshPartitions = 
getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES).replace("[", "")
+                    .replace("]", "");
+            boolean isIncremental = 
"true".equals(getParam(NBatchConstants.P_INCREMENTAL_BUILD));
             InternalTablePartition tablePartition = 
internalTable.getTablePartition();
-            // release current job_range
-            String[] curJobRange = new String[] { "0", "0" };
-            if (null != tablePartition && 
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())
-                    && StringUtils.isNotEmpty(startDate)) {
-                SimpleDateFormat fmt = new 
SimpleDateFormat(tablePartition.getDatePartitionFormat(), Locale.ROOT);
-                curJobRange = new String[] { 
fmt.format(Long.parseLong(startDate)),
-                        fmt.format(Long.parseLong(endDate)) };
-            }
             // merge latest partition_range
             logger.info("starting merging delta partitions for internal table 
{}", tableName);
             if (null != tablePartition && 
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())) {
@@ -130,12 +124,11 @@ public class InternalTableLoadingJob extends 
DefaultExecutableOnTable {
                         tablePartition.getDatePartitionFormat());
                 internalTable.setPartitionRange(partitionRange);
             }
-            List<String[]> jobRange = internalTable.getJobRange();
-            String[] finalCurJobRange = curJobRange;
-            jobRange.removeIf(rang -> rang[0].equals(finalCurJobRange[0]) && 
rang[1].equals(finalCurJobRange[1]));
-            internalTable.setJobRange(jobRange);
+            // release current job_range
+            InternalTableUpdateMetadataStep metadataStep = new 
InternalTableUpdateMetadataStep();
+            metadataStep.releaseJobRange(internalTable, isIncremental, 
startDate, endDate, refreshPartitions);
             internalTableManager.saveOrUpdateInternalTable(internalTable);
-            logger.info("release job_range for internal table {} , range {}.", 
tableName, jobRange);
+            logger.info("release job_range for internal table {} ", tableName);
             return true;
         }, getProject());
     }
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableUpdateMetadataStep.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableUpdateMetadataStep.java
index 46c3c2065a..abf20ba3a7 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableUpdateMetadataStep.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableUpdateMetadataStep.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.spark.job;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.engine.spark.builder.InternalTableLoader;
 import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
 import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.job.JobContext;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -92,6 +94,9 @@ public class InternalTableUpdateMetadataStep extends 
AbstractExecutable {
             String project = getParam(NBatchConstants.P_PROJECT_NAME);
             String startDate = getParam(NBatchConstants.P_START_DATE);
             String endDate = getParam(NBatchConstants.P_END_DATE);
+            String refreshPartitions = 
getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES).replace("[", "")
+                    .replace("]", "");
+            boolean isIncremental = 
"true".equals(getParam(NBatchConstants.P_INCREMENTAL_BUILD));
             // fetch delta partition info
             InternalTableManager internalTableManager = 
InternalTableManager.getInstance(config, project);
             InternalTableDesc internalTable = 
internalTableManager.getInternalTableDesc(tableName);
@@ -109,21 +114,7 @@ public class InternalTableUpdateMetadataStep extends 
AbstractExecutable {
                         tablePartition.getDatePartitionFormat());
                 internalTable.setPartitionRange(partitionRange);
             }
-            // release current job_range
-            String[] curJobRange = new String[] { "0", "0" };
-            if (null != tablePartition && 
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())
-                    && StringUtils.isNotEmpty(startDate)) {
-                SimpleDateFormat fmt = new 
SimpleDateFormat(tablePartition.getDatePartitionFormat(), Locale.ROOT);
-                curJobRange = new String[] { 
fmt.format(Long.parseLong(startDate)),
-                        fmt.format(Long.parseLong(endDate)) };
-            }
-            List<String[]> jobRange = internalTable.getJobRange();
-            String[] finalCurJobRange = curJobRange;
-            jobRange.removeIf(rang -> rang[0].equals(finalCurJobRange[0]) && 
rang[1].equals(finalCurJobRange[1]));
-            internalTable.setJobRange(jobRange);
-            logger.info("trying to release job_range for internal table {} , 
range {}.",
-                    internalTable.getTableDesc().getTableAlias(), jobRange);
-
+            releaseJobRange(internalTable, isIncremental, startDate, endDate, 
refreshPartitions);
             internalTableManager.saveOrUpdateInternalTable(internalTable);
             logger.info("update metadata for internal table {} cost: {} ms.",
                     internalTable.getTableDesc().getTableAlias(), 
(System.currentTimeMillis() - startTime));
@@ -131,6 +122,36 @@ public class InternalTableUpdateMetadataStep extends 
AbstractExecutable {
         }, project);
     }
 
+    public void releaseJobRange(InternalTableDesc internalTable, boolean 
isIncremental, String startDate,
+            String endDate, String refreshPartitions) throws Exception {
+        InternalTablePartition tablePartition = 
internalTable.getTablePartition();
+        // release current job_range
+        logger.info("starting releasing job_range");
+        List<String[]> curRange = Lists.newArrayList();
+        if (!isIncremental) {
+            curRange.add(new String[] { "0", "0" });
+        }
+        if (StringUtils.isNotEmpty(refreshPartitions)
+                && 
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())) {
+            String[] partitions = refreshPartitions.split(", ");
+            curRange.addAll(
+                    DataRangeUtils.mergeTimeRange(Arrays.asList(partitions), 
tablePartition.getDatePartitionFormat()));
+        }
+        if (null != tablePartition && 
StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())
+                && StringUtils.isNotEmpty(startDate)) {
+            SimpleDateFormat fmt = new 
SimpleDateFormat(tablePartition.getDatePartitionFormat(), Locale.ROOT);
+            curRange.add(new String[] { fmt.format(Long.parseLong(startDate)), 
fmt.format(Long.parseLong(endDate)) });
+        }
+        List<String[]> jobRange = internalTable.getJobRange();
+        curRange.forEach(curJobRange -> {
+            jobRange.removeIf(range -> range[0].equals(curJobRange[0]) && 
range[1].equals(curJobRange[1]));
+        });
+        internalTable.setJobRange(jobRange);
+        logger.info("trying to release job_range for internal table {} , range 
{}.",
+                internalTable.getTableDesc().getTableAlias(), jobRange);
+
+    }
+
     public InternalTableLoadJob.InternalTableMetaUpdateInfo 
extractUpdateInfo(String project, String tableName,
             KylinConfig config, SparkSession ss) {
         InternalTableManager internalTableManager = 
InternalTableManager.getInstance(config, project);
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
index 218b672dbf..d2927a63e3 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
@@ -25,7 +25,7 @@ import java.util.Locale
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.exception.{CommonErrorCode, KylinException}
-import org.apache.kylin.common.util.{DateFormat, HadoopUtil}
+import org.apache.kylin.common.util.HadoopUtil
 import org.apache.kylin.engine.spark.utils.SparkDataSource._
 import org.apache.kylin.metadata.cube.model.NBatchConstants
 import org.apache.kylin.metadata.table.InternalTableDesc
@@ -102,9 +102,6 @@ class InternalTableLoader extends Logging {
     }
     val format = table.getStorageType.getFormat
     if (incremental) {
-      val dateFormat = table.getTablePartition.getDatePartitionFormat
-      logInfo(f"Refresh dynamic partitions 
[${DateFormat.formatToDateStr(startDate.toLong, dateFormat)}," +
-        f" ${DateFormat.formatToDateStr(endDate.toLong, dateFormat)})")
       ss.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
     }
     writer.format(format).mode(outPutMode).save(location)


Reply via email to