This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new a69c8fdcc2a [Feature](WIP) Add Create StreamingJob and Alter Job
(#55814)
a69c8fdcc2a is described below
commit a69c8fdcc2aad853ab3313a1e2fad50b95321e69
Author: wudi <[email protected]>
AuthorDate: Tue Sep 9 17:16:09 2025 +0800
[Feature](WIP) Add Create StreamingJob and Alter Job (#55814)
### What problem does this PR solve?
1. Add Create StreamingJob and Alter Job
2. add job and task tvf schema
3. add offset api
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 13 +-
.../org/apache/doris/job/base/JobProperties.java | 25 ++++
.../doris/job/extensions/insert/InsertJob.java | 31 +++++
.../doris/job/extensions/insert/InsertTask.java | 5 +-
.../insert/streaming/StreamingInsertJob.java | 25 ++++
.../insert/streaming/StreamingJobProperties.java | 64 +++++++++
.../org/apache/doris/job/manager/JobManager.java | 23 ++++
.../java/org/apache/doris/job/offset/Offset.java | 22 +++
.../doris/job/offset/SourceOffsetProvider.java | 62 +++++++++
.../job/offset/SourceOffsetProviderFactory.java | 37 +++++
.../org/apache/doris/job/offset/s3/S3Offset.java | 36 +++++
.../job/offset/s3/S3SourceOffsetProvider.java | 53 ++++++++
.../doris/nereids/parser/LogicalPlanBuilder.java | 14 +-
.../apache/doris/nereids/trees/plans/PlanType.java | 1 +
.../trees/plans/commands/AlterJobCommand.java | 151 +++++++++++++++++++++
.../trees/plans/commands/info/CreateJobInfo.java | 64 ++++++++-
.../trees/plans/visitor/CommandVisitor.java | 5 +
17 files changed, 619 insertions(+), 12 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index d8f5f810eb0..a0ae7c6916e 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -99,16 +99,19 @@ materializedViewStatement
| SHOW CREATE MATERIALIZED VIEW mvName=multipartIdentifier
#showCreateMTMV
;
supportedJobStatement
- : CREATE JOB label=multipartIdentifier ON SCHEDULE
- (
+ : CREATE JOB label=multipartIdentifier propertyClause?
+ ON (STREAMING | SCHEDULE(
(EVERY timeInterval=INTEGER_VALUE timeUnit=identifier
(STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))?
(ENDS endsTime=STRING_LITERAL)?)
|
- (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)))
- commentSpec?
- DO supportedDmlStatement
#createScheduledJob
+ (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP))
+ )
+ )
+ commentSpec?
+ DO supportedDmlStatement
#createScheduledJob
| PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL)
#pauseJob
+ | ALTER JOB FOR (jobNameKey=identifier) (propertyClause |
supportedDmlStatement | propertyClause supportedDmlStatement)
#alterJob
| DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ
(jobNameValue=STRING_LITERAL)
#dropJob
| RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL)
#resumeJob
| CANCEL TASK WHERE (jobNameKey=identifier) EQ
(jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ
(taskIdValue=INTEGER_VALUE) #cancelJobTask
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
new file mode 100644
index 00000000000..3985b59bf16
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.base;
+
+import org.apache.doris.common.AnalysisException;
+
+public interface JobProperties {
+ default void validate() throws AnalysisException {
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index dd0e0d3a228..5b386886b19 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -51,6 +51,8 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TabletCommitInfo;
@@ -96,6 +98,11 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.add(new Column("CreateTime", ScalarType.createStringType()))
.addAll(COMMON_SCHEMA)
.add(new Column("Comment", ScalarType.createStringType()))
+ // only execute type = streaming need record
+ .add(new Column("Progress", ScalarType.createStringType()))
+ .add(new Column("RemoteOffset", ScalarType.createStringType()))
+ .add(new Column("LoadStatistic", ScalarType.createStringType()))
+ .add(new Column("ErrorMsg", ScalarType.createStringType()))
.build();
private static final ShowResultSetMetaData TASK_META_DATA =
@@ -112,6 +119,8 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.addColumn(new Column("TrackingUrl",
ScalarType.createVarchar(200)))
.addColumn(new Column("LoadStatistic",
ScalarType.createVarchar(200)))
.addColumn(new Column("User",
ScalarType.createVarchar(50)))
+ // only execute type = streaming need record
+ .addColumn(new Column("Offset",
ScalarType.createStringType()))
.build();
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -523,6 +532,28 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
}
}
+ @Override
+ public TRow getTvfInfo() {
+ TRow trow = new TRow();
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getJobId())));
+ trow.addToColumnValue(new TCell().setStringVal(getJobName()));
+ trow.addToColumnValue(new
TCell().setStringVal(getCreateUser().getQualifiedUser()));
+ trow.addToColumnValue(new
TCell().setStringVal(getJobConfig().getExecuteType().name()));
+ trow.addToColumnValue(new
TCell().setStringVal(getJobConfig().convertRecurringStrategyToString()));
+ trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
+ trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getSucceedTaskCount().get())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
+ trow.addToColumnValue(new TCell().setStringVal(getComment()));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new
TCell().setStringVal(loadStatistic.toJson()));
+ trow.addToColumnValue(new TCell().setStringVal(failMsg == null ?
FeConstants.null_string : failMsg.getMsg()));
+ return trow;
+ }
+
@Override
public String formatMsgWhenExecuteQueueFull(Long taskId) {
return commonFormatMsgWhenExecuteQueueFull(taskId,
"insert_task_queue_size",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index f6633d6c5e8..34cfdf6edea 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -67,7 +67,8 @@ public class InsertTask extends AbstractTask {
new Column("FinishTime", ScalarType.createStringType()),
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
- new Column("User", ScalarType.createStringType()));
+ new Column("User", ScalarType.createStringType()),
+ new Column("Offset", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -272,6 +273,7 @@ public class InsertTask extends AbstractTask {
} else {
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
}
+ trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}
@@ -292,6 +294,7 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
+ trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 9e5baccd134..a2016c52cb9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -17,13 +17,16 @@
package org.apache.doris.job.extensions.insert.streaming;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.common.io.Text;
import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.PauseReason;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
@@ -37,6 +40,8 @@ import java.util.Map;
public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask,
Map<Object, Object>> {
+ @SerializedName("did")
+ private final long dbId;
@Getter
@SerializedName("st")
protected JobStatus status;
@@ -52,6 +57,26 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Setter
protected long autoResumeCount;
+ @Getter
+ @SerializedName("jp")
+ private StreamingJobProperties jobProperties;
+
+ public StreamingInsertJob(String jobName,
+ JobStatus jobStatus,
+ String dbName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig,
+ Long createTimeMs,
+ String executeSql,
+ StreamingJobProperties jobProperties) {
+ super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
+ jobConfig, createTimeMs, executeSql);
+ this.dbId = ConnectContext.get().getCurrentDbId();
+ this.jobProperties = jobProperties;
+ }
+
+
@Override
public void updateJobStatus(JobStatus status) throws JobException {
super.updateJobStatus(status);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
new file mode 100644
index 00000000000..25d256b127b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.job.base.JobProperties;
+
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+public class StreamingJobProperties implements JobProperties {
+ public static final String MAX_INTERVAL_SECOND_PROPERTY = "max_interval";
+ public static final String S3_BATCH_FILES_PROPERTY = "s3.batch_files";
+ public static final String S3_BATCH_SIZE_PROPERTY = "s3.batch_size";
+ public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
+ public static final long DEFAULT_S3_BATCH_FILES = 256;
+ public static final long DEFAULT_S3_BATCH_SIZE = 10 * 1024 * 1024 * 1024L;
// 10GB
+ public static final long DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
+
+ private final Map<String, String> properties;
+ private long maxIntervalSecond;
+ private long s3BatchFiles;
+ private long s3BatchSize;
+
+ public StreamingJobProperties(Map<String, String> jobProperties) {
+ this.properties = jobProperties;
+ }
+
+ @Override
+ public void validate() throws AnalysisException {
+ this.maxIntervalSecond = Util.getLongPropertyOrDefault(
+
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
+ StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND, (v) -> v
>= 1,
+ StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY + " should
> 1");
+
+ this.s3BatchFiles = Util.getLongPropertyOrDefault(
+ properties.get(StreamingJobProperties.S3_BATCH_FILES_PROPERTY),
+ StreamingJobProperties.DEFAULT_S3_BATCH_FILES, (v) -> v >= 1,
+ StreamingJobProperties.S3_BATCH_FILES_PROPERTY + " should >=1
");
+
+ this.s3BatchSize =
Util.getLongPropertyOrDefault(properties.get(StreamingJobProperties.S3_BATCH_SIZE_PROPERTY),
+ StreamingJobProperties.DEFAULT_S3_BATCH_SIZE, (v) -> v >= 100
* 1024 * 1024
+ && v <= (long) (1024 * 1024 * 1024) * 10,
+ StreamingJobProperties.S3_BATCH_SIZE_PROPERTY + " should
between 100MB and 10GB");
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 3d705b945e7..e763f8de590 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
@@ -219,6 +220,17 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
jobMap.get(jobId).logUpdateOperation();
}
+ public void alterJob(T job) {
+ writeLock();
+ try {
+ jobMap.put(job.getJobId(), job);
+ job.logUpdateOperation();
+ } finally {
+ writeUnlock();
+ }
+ log.info("update job success, jobId: {}", job.getJobId());
+ }
+
public void alterJobStatus(String jobName, JobStatus jobStatus) throws
JobException {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName)) {
@@ -349,6 +361,9 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
*/
public void cancelTaskById(String jobName, Long taskId) throws
JobException {
for (T job : jobMap.values()) {
+ if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
+ throw new JobException("streaming job not support cancel task
by id");
+ }
if (job.getJobName().equals(jobName)) {
job.cancelTaskById(taskId);
job.logUpdateOperation();
@@ -392,6 +407,14 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
return jobMap.get(jobId);
}
+ public T getJobByName(String jobName) throws JobException {
+ for (T a : jobMap.values()) {
+ if (a.getJobName().equals(jobName)) {
+ return a;
+ }
+ }
+ throw new JobException("job not exist, jobName:" + jobName);
+ }
/**
* get load info by db
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
new file mode 100644
index 00000000000..095f0a5e6bf
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset;
+
+public interface Offset {
+ String toJson();
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
new file mode 100644
index 00000000000..f88079617de
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset;
+
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+
+/**
+ * Interface for managing offsets and metadata of a data source.
+ */
+public interface SourceOffsetProvider {
+ /**
+ * Get source type, e.g. s3, kafka
+ * @return
+ */
+ String getSourceType();
+
+ /**
+ * Get next offset to consume
+ * @return
+ */
+ Offset getNextOffset();
+
+ /**
+ * Rewrite the TVF parameters in the InsertIntoTableCommand based on the
current offset.
+ * @param command
+ * @return
+ */
+ InsertIntoTableCommand rewriteTvfParamsInCommand(InsertIntoTableCommand
command);
+
+ /**
+ * Update the progress of the source.
+ * @param offset
+ */
+ void updateProgress(Offset offset);
+
+ /**
+ * Fetch remote meta information, such as listing files in S3 or getting
latest offsets in Kafka.
+ */
+ void fetchRemoteMeta();
+
+ /**
+ * Whether there is more data to consume
+ * @return
+ */
+ boolean hasMoreData();
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
new file mode 100644
index 00000000000..5ba1d903d78
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset;
+
+import org.apache.doris.job.offset.s3.S3SourceOffsetProvider;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SourceOffsetProviderFactory {
+ private static final Map<String, Class<? extends SourceOffsetProvider>>
map = new ConcurrentHashMap<>();
+
+ static {
+ map.put("s3", S3SourceOffsetProvider.class);
+ }
+
+ public static SourceOffsetProvider createSourceOffsetProvider(String
sourceType) throws InstantiationException,
+ IllegalAccessException {
+ Class<? extends SourceOffsetProvider> cla =
map.get(sourceType.toUpperCase());
+ return cla.newInstance();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
new file mode 100644
index 00000000000..86ff467796a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.s3;
+
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import java.util.List;
+
+public class S3Offset implements Offset {
+ String startFile;
+
+ String endFile;
+
+ List<String> fileLists;
+
+ @Override
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
new file mode 100644
index 00000000000..087d9c2beb7
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.s3;
+
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+
+public class S3SourceOffsetProvider implements SourceOffsetProvider {
+
+ @Override
+ public String getSourceType() {
+ return null;
+ }
+
+ @Override
+ public Offset getNextOffset() {
+ return null;
+ }
+
+ @Override
+ public InsertIntoTableCommand
rewriteTvfParamsInCommand(InsertIntoTableCommand command) {
+ return null;
+ }
+
+ @Override
+ public void updateProgress(Offset offset) {
+ }
+
+ @Override
+ public void fetchRemoteMeta() {
+ }
+
+ @Override
+ public boolean hasMoreData() {
+ return false;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index abb468a19cc..80a0886017e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -608,6 +608,7 @@ import
org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand;
import
org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand;
+import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterResourceCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterRoleCommand;
@@ -1144,11 +1145,13 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
Optional<Long> interval = ctx.timeInterval == null ? Optional.empty() :
Optional.of(Long.valueOf(ctx.timeInterval.getText()));
Optional<String> intervalUnit = ctx.timeUnit == null ?
Optional.empty() : Optional.of(ctx.timeUnit.getText());
+ Map<String, String> properties = ctx.propertyClause() != null
+ ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) :
Maps.newHashMap();
String comment =
visitCommentSpec(ctx.commentSpec());
String executeSql = getOriginSql(ctx.supportedDmlStatement());
CreateJobInfo createJobInfo = new CreateJobInfo(label, atTime,
interval, intervalUnit, startTime,
- endsTime, immediateStartOptional, comment, executeSql);
+ endsTime, immediateStartOptional, comment, executeSql,
ctx.STREAMING() != null, properties);
return new CreateJobCommand(createJobInfo);
}
@@ -1158,6 +1161,15 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
}
}
+ @Override
+ public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) {
+ checkJobNameKey(stripQuotes(ctx.jobNameKey.getText()), JOB_NAME, ctx);
+ Map<String, String> properties = ctx.propertyClause() != null
+ ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) :
Maps.newHashMap();
+ String executeSql = getOriginSql(ctx.supportedDmlStatement());
+ return new AlterJobCommand(stripQuotes(ctx.jobNameKey.getText()),
properties, executeSql);
+ }
+
@Override
public LogicalPlan visitPauseJob(DorisParser.PauseJobContext ctx) {
checkJobNameKey(stripQuotes(ctx.jobNameKey.getText()), JOB_NAME, ctx);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index e150899f95c..367e6d641e0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -162,6 +162,7 @@ public enum PlanType {
CREATE_MTMV_COMMAND,
CREATE_MATERIALIZED_VIEW_COMMAND,
CREATE_JOB_COMMAND,
+ ALTER_JOB_COMMAND,
PAUSE_JOB_COMMAND,
CANCEL_JOB_COMMAND,
DROP_CATALOG_COMMAND,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
new file mode 100644
index 00000000000..ef8c9bb8b7a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.IntervalUnit;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+
+/**
+ * alter job command.
+ */
+public class AlterJobCommand extends AlterCommand implements ForwardWithSync {
+ // exclude job name prefix, which is used by inner job
+ private static final String excludeJobNamePrefix = "inner_";
+ private String jobName;
+ private Map<String, String> properties;
+ private String sql;
+
+ public AlterJobCommand(String jobName, Map<String, String> properties,
String sql) {
+ super(PlanType.ALTER_JOB_COMMAND);
+ this.jobName = jobName;
+ this.properties = properties;
+ this.sql = sql;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ @Override
+ public StmtType stmtType() {
+ return StmtType.ALTER;
+ }
+
+ @Override
+ public void doRun(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+
+ validate();
+ AbstractJob job = analyzeAndBuildJobInfo(ctx);
+ ctx.getEnv().getJobManager().alterJob(job);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitAlterJobCommand(this, context);
+ }
+
+ private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws
JobException {
+ AbstractJob job =
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
+ if (job instanceof StreamingInsertJob) {
+ StreamingInsertJob originJob = (StreamingInsertJob) job;
+ String updateSQL = StringUtils.isEmpty(sql) ?
originJob.getExecuteSql() : sql;
+ Map<String, String> updateProps = properties == null ||
properties.isEmpty() ? originJob.getJobProperties()
+ .getProperties() : properties;
+ StreamingJobProperties streamJobProps = new
StreamingJobProperties(updateProps);
+ // rebuild time definition
+ JobExecutionConfiguration execConfig = originJob.getJobConfig();
+ TimerDefinition timerDefinition = new TimerDefinition();
+ timerDefinition.setInterval(streamJobProps.getMaxIntervalSecond());
+ timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
+ execConfig.setTimerDefinition(timerDefinition);
+ return new StreamingInsertJob(jobName,
+ job.getJobStatus(),
+ job.getCurrentDbName(),
+ job.getComment(),
+ ConnectContext.get().getCurrentUserIdentity(),
+ execConfig,
+ System.currentTimeMillis(),
+ updateSQL,
+ streamJobProps);
+ } else {
+ throw new JobException("Unsupported job type for ALTER:" +
job.getJobType());
+ }
+ }
+
+ private void validate() throws Exception {
+ if (jobName.startsWith(excludeJobNamePrefix)) {
+ throw new AnalysisException("Can't alter inner job");
+ }
+ AbstractJob job =
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
+ if (!JobStatus.PAUSED.equals(job.getJobStatus())) {
+ throw new AnalysisException("Only PAUSED job can be altered");
+ }
+
+ if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
+ && job instanceof StreamingInsertJob) {
+ StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+ boolean proCheck =
checkProperties(streamingJob.getJobProperties().getProperties());
+ boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
+ if (!proCheck && !sqlCheck) {
+ throw new AnalysisException("No properties or sql changed in
ALTER JOB");
+ }
+ } else {
+ throw new AnalysisException("Unsupported job type for ALTER:" +
job.getJobType());
+ }
+ }
+
+ private boolean checkProperties(Map<String, String> originProps) {
+ if (originProps.isEmpty()) {
+ return false;
+ }
+ if (!originProps.equals(properties)) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean checkSql(String sql) {
+ if (sql == null || sql.isEmpty()) {
+ return false;
+ }
+ if (!sql.equals(sql)) {
+ return true;
+ }
+ return false;
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index 6cef7ee89ec..0d52e23ece5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -27,10 +27,13 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.JobProperties;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.parser.NereidsParser;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
@@ -39,6 +42,7 @@ import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
+import java.util.Map;
import java.util.Optional;
/**
@@ -66,6 +70,8 @@ public class CreateJobInfo {
private final String comment;
private final String executeSql;
+ private final boolean streamingJob;
+ private final Map<String, String> jobProperties;
/**
* Constructor for CreateJobInfo.
@@ -83,7 +89,8 @@ public class CreateJobInfo {
public CreateJobInfo(Optional<String> labelNameOptional, Optional<String>
onceJobStartTimestampOptional,
Optional<Long> intervalOptional, Optional<String>
intervalTimeUnitOptional,
Optional<String> startsTimeStampOptional,
Optional<String> endsTimeStampOptional,
- Optional<Boolean> immediateStartOptional, String
comment, String executeSql) {
+ Optional<Boolean> immediateStartOptional, String
comment, String executeSql,
+ boolean streamingJob, Map<String, String>
jobProperties) {
this.labelNameOptional = labelNameOptional;
this.onceJobStartTimestampOptional = onceJobStartTimestampOptional;
this.intervalOptional = intervalOptional;
@@ -93,7 +100,8 @@ public class CreateJobInfo {
this.immediateStartOptional = immediateStartOptional;
this.comment = comment;
this.executeSql = executeSql;
-
+ this.streamingJob = streamingJob;
+ this.jobProperties = jobProperties;
}
/**
@@ -117,16 +125,32 @@ public class CreateJobInfo {
// check its insert stmt,currently only support insert stmt
JobExecutionConfiguration jobExecutionConfiguration = new
JobExecutionConfiguration();
JobExecuteType executeType = intervalOptional.isPresent() ?
JobExecuteType.RECURRING : JobExecuteType.ONE_TIME;
+ JobProperties properties = null;
+ if (streamingJob) {
+ executeType = JobExecuteType.STREAMING;
+ properties = new StreamingJobProperties(jobProperties);
+ }
jobExecutionConfiguration.setExecuteType(executeType);
- TimerDefinition timerDefinition = new TimerDefinition();
+ TimerDefinition timerDefinition = new TimerDefinition();
if (executeType.equals(JobExecuteType.ONE_TIME)) {
buildOnceJob(timerDefinition, jobExecutionConfiguration);
+ } else if (executeType.equals(JobExecuteType.STREAMING)) {
+ buildStreamingJob(timerDefinition, properties);
} else {
buildRecurringJob(timerDefinition, jobExecutionConfiguration);
}
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
- return analyzeAndCreateJob(executeSql, dbName,
jobExecutionConfiguration);
+ return analyzeAndCreateJob(executeSql, dbName,
jobExecutionConfiguration, properties);
+ }
+
+ private void buildStreamingJob(TimerDefinition timerDefinition,
JobProperties props)
+ throws AnalysisException {
+ StreamingJobProperties properties = (StreamingJobProperties) props;
+ timerDefinition.setInterval(properties.getMaxIntervalSecond());
+ timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+ timerDefinition.setStartTimeMs(System.currentTimeMillis());
+ properties.validate();
}
/**
@@ -210,7 +234,17 @@ public class CreateJobInfo {
* @throws UserException if there is an error during SQL analysis or job
creation
*/
private AbstractJob analyzeAndCreateJob(String sql, String currentDbName,
- JobExecutionConfiguration
jobExecutionConfiguration) throws UserException {
+ JobExecutionConfiguration
jobExecutionConfiguration,
+ JobProperties properties) throws
UserException {
+ if
(jobExecutionConfiguration.getExecuteType().equals(JobExecuteType.STREAMING)) {
+ return analyzeAndCreateStreamingInsertJob(sql, currentDbName,
jobExecutionConfiguration, properties);
+ } else {
+ return analyzeAndCreateInsertJob(sql, currentDbName,
jobExecutionConfiguration);
+ }
+ }
+
+ private AbstractJob analyzeAndCreateInsertJob(String sql, String
currentDbName,
+ JobExecutionConfiguration jobExecutionConfiguration) throws
UserException {
NereidsParser parser = new NereidsParser();
LogicalPlan logicalPlan = parser.parseSingle(sql);
if (logicalPlan instanceof InsertIntoTableCommand) {
@@ -234,6 +268,26 @@ public class CreateJobInfo {
}
}
+ private AbstractJob analyzeAndCreateStreamingInsertJob(String sql, String
currentDbName,
+ JobExecutionConfiguration jobExecutionConfiguration, JobProperties
properties) throws UserException {
+ NereidsParser parser = new NereidsParser();
+ LogicalPlan logicalPlan = parser.parseSingle(sql);
+ if (logicalPlan instanceof InsertIntoTableCommand) {
+ return new StreamingInsertJob(labelNameOptional.get(),
+ JobStatus.RUNNING,
+ currentDbName,
+ comment,
+ ConnectContext.get().getCurrentUserIdentity(),
+ jobExecutionConfiguration,
+ System.currentTimeMillis(),
+ sql,
+ (StreamingJobProperties) properties);
+ } else {
+ throw new AnalysisException("Not support this sql : " + sql + "
Command class is "
+ + logicalPlan.getClass().getName() + ".");
+ }
+ }
+
private void checkJobName(String jobName) throws AnalysisException {
if (Strings.isNullOrEmpty(jobName)) {
throw new AnalysisException("job name can not be null");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index a5f9a00be1d..45abfa0c250 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -37,6 +37,7 @@ import
org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand;
import
org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand;
+import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterJobStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterResourceCommand;
@@ -387,6 +388,10 @@ public interface CommandVisitor<R, C> {
return visitCommand(createJobCommand, context);
}
+ default R visitAlterJobCommand(AlterJobCommand alterJobCommand, C context)
{
+ return visitCommand(alterJobCommand, context);
+ }
+
default R visitCreateFileCommand(CreateFileCommand createFileCommand, C
context) {
return visitCommand(createFileCommand, context);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]