This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new 33902107d3c [Feature](WIP) Add create job case and fix job bug (#56119)
33902107d3c is described below
commit 33902107d3cd2ef387d6847f62f0ea3f2174d40c
Author: wudi <[email protected]>
AuthorDate: Wed Sep 17 09:47:00 2025 +0800
[Feature](WIP) Add create job case and fix job bug (#56119)
### What problem does this PR solve?
Add create job case and fix job bug
---
.../property/storage/StorageProperties.java | 6 +-
.../main/java/org/apache/doris/fs/FileSystem.java | 2 +-
.../java/org/apache/doris/fs/obj/S3ObjStorage.java | 16 ++--
.../org/apache/doris/fs/remote/RemoteFile.java | 18 +++++
.../org/apache/doris/fs/remote/S3FileSystem.java | 2 +-
.../org/apache/doris/job/base/AbstractJob.java | 6 +-
.../org/apache/doris/job/base/JobProperties.java | 3 -
.../insert/streaming/StreamingInsertJob.java | 55 +++++++++----
.../insert/streaming/StreamingInsertTask.java | 16 +++-
.../insert/streaming/StreamingJobProperties.java | 8 +-
.../doris/job/offset/SourceOffsetProvider.java | 2 +-
.../org/apache/doris/job/offset/s3/S3Offset.java | 2 +-
.../job/offset/s3/S3SourceOffsetProvider.java | 60 +++++++-------
.../trees/plans/commands/AlterJobCommand.java | 23 ++----
.../trees/plans/commands/info/CreateJobInfo.java | 24 +++---
.../commands/insert/InsertIntoTableCommand.java | 7 ++
.../org/apache/doris/catalog/DropFunctionTest.java | 29 ++++++-
.../streaming_job/test_streaming_insert_job.out | Bin 0 -> 364 bytes
.../streaming_job/test_streaming_insert_job.groovy | 86 +++++++++++++++++++++
19 files changed, 266 insertions(+), 99 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 2ce87f9ffb9..422d20a7560 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -159,8 +159,6 @@ public abstract class StorageProperties extends
ConnectionProperties {
private static final List<Function<Map<String, String>,
StorageProperties>> PROVIDERS =
Arrays.asList(
- props -> (isFsSupport(props, FS_HDFS_SUPPORT)
- || HdfsProperties.guessIsMe(props)) ? new
HdfsProperties(props) : null,
props -> ((isFsSupport(props, FS_OSS_HDFS_SUPPORT)
|| isFsSupport(props, DEPRECATED_OSS_HDFS_SUPPORT))
|| OSSHdfsProperties.guessIsMe(props)) ? new
OSSHdfsProperties(props) : null,
@@ -181,7 +179,9 @@ public abstract class StorageProperties extends
ConnectionProperties {
props -> (isFsSupport(props, FS_BROKER_SUPPORT)
|| BrokerProperties.guessIsMe(props)) ? new
BrokerProperties(props) : null,
props -> (isFsSupport(props, FS_LOCAL_SUPPORT)
- || LocalProperties.guessIsMe(props)) ? new
LocalProperties(props) : null
+ || LocalProperties.guessIsMe(props)) ? new
LocalProperties(props) : null,
+ props -> (isFsSupport(props, FS_HDFS_SUPPORT)
+ || HdfsProperties.guessIsMe(props)) ? new
HdfsProperties(props) : null
);
protected StorageProperties(Type type, Map<String, String> origProps) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index cfbb3e560f3..b2c6957ce38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -124,7 +124,7 @@ public interface FileSystem {
* @param fileNumLimit limit the total number of files to be listed.
* @return
*/
- default String globListWithLimit(String remotePath, List<String> result,
+ default String globListWithLimit(String remotePath, List<RemoteFile>
result,
String startFile, long fileSizeLimit, long fileNumLimit) {
throw new UnsupportedOperationException("Unsupported operation glob
list with limit on current file system.");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 0a4c9159881..4c79df0acf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -642,7 +642,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
*
* @return The largest file name after listObject this time
*/
- public String globListWithLimit(String remotePath, List<String> result,
String startFile,
+ public String globListWithLimit(String remotePath, List<RemoteFile>
result, String startFile,
long fileSizeLimit, long fileNumLimit) {
long roundCnt = 0;
long elementCnt = 0;
@@ -704,9 +704,16 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
}
matchCnt++;
+ RemoteFile remoteFile = new
RemoteFile(objPath.getFileName().toString(),
+ !isPrefix,
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? -1 : obj.size(),
+ isPrefix ? 0 :
obj.lastModified().toEpochMilli()
+ );
+ remoteFile.setBucket(bucket);
+
remoteFile.setParentPath(objPath.getParent().toString());
matchFileSize += obj.size();
- String remoteFileName = "s3://" + bucket + "/" +
objPath;
- result.add(remoteFileName);
+ result.add(remoteFile);
if (reachLimit(result.size(), matchFileSize,
fileSizeLimit, fileNumLimit)) {
break;
@@ -718,8 +725,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
}
//record current last object file name
S3Object lastS3Object =
response.contents().get(response.contents().size() - 1);
- java.nio.file.Path lastObjPath = Paths.get(lastS3Object.key());
- currentMaxFile = "s3://" + bucket + "/" + lastObjPath;
+ currentMaxFile = lastS3Object.key();
isTruncated = response.isTruncated();
if (isTruncated) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
index 1f6f0225278..ac1f8add947 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
@@ -36,6 +36,8 @@ public class RemoteFile {
private long modificationTime;
private Path path;
BlockLocation[] blockLocations;
+ private String parentPath;
+ private String bucket;
public RemoteFile(String name, boolean isFile, long size, long blockSize) {
this(name, null, isFile, !isFile, size, blockSize, 0, null);
@@ -75,6 +77,22 @@ public class RemoteFile {
this.path = path;
}
+ public String getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+ public String getParentPath() {
+ return parentPath;
+ }
+
+ public void setParentPath(String parentPath) {
+ this.parentPath = parentPath;
+ }
+
public boolean isFile() {
return isFile;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 9c409a66a29..d4e5a23c20e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -69,7 +69,7 @@ public class S3FileSystem extends ObjFileSystem {
}
@Override
- public String globListWithLimit(String remotePath, List<String> result,
String startFile,
+ public String globListWithLimit(String remotePath, List<RemoteFile>
result, String startFile,
long fileSizeLimit, long fileNumLimit) {
S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
return objStorage.globListWithLimit(remotePath, result, startFile,
fileSizeLimit, fileNumLimit);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 83b84348ff7..d08942460f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -99,13 +99,13 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
@SerializedName(value = "stc")
- private AtomicLong succeedTaskCount = new AtomicLong(0);
+ protected AtomicLong succeedTaskCount = new AtomicLong(0);
@SerializedName(value = "ftc")
- private AtomicLong failedTaskCount = new AtomicLong(0);
+ protected AtomicLong failedTaskCount = new AtomicLong(0);
@SerializedName(value = "ctc")
- private AtomicLong canceledTaskCount = new AtomicLong(0);
+ protected AtomicLong canceledTaskCount = new AtomicLong(0);
public AbstractJob() {
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
index 3985b59bf16..0cb08747858 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
@@ -17,9 +17,6 @@
package org.apache.doris.job.base;
-import org.apache.doris.common.AnalysisException;
public interface JobProperties {
- default void validate() throws AnalysisException {
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 24df73cbbac..387714e8805 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -21,14 +21,18 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.PauseReason;
@@ -75,8 +79,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
TxnStateChangeCallback, GsonPostProcessable {
private final long dbId;
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
- @SerializedName("fm")
- private FailMsg failMsg;
@Getter
protected PauseReason pauseReason;
@Getter
@@ -86,7 +88,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Setter
protected long autoResumeCount;
@Getter
- @SerializedName("jp")
+ @SerializedName("props")
+ private final Map<String, String> properties;
private StreamingJobProperties jobProperties;
@Getter
@SerializedName("tvf")
@@ -108,26 +111,40 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql,
- StreamingJobProperties jobProperties) {
+ Map<String, String> properties) {
super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
- this.jobProperties = jobProperties;
+ this.properties = properties;
init();
}
private void init() {
try {
+ this.jobProperties = new StreamingJobProperties(properties);
+ jobProperties.validate();
+ // build time definition
+ JobExecutionConfiguration execConfig = getJobConfig();
+ TimerDefinition timerDefinition = new TimerDefinition();
+ timerDefinition.setInterval(jobProperties.getMaxIntervalSecond());
+ timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
+ execConfig.setTimerDefinition(timerDefinition);
+
UnboundTVFRelation currentTvf = getCurrentTvf();
+ this.tvfType = currentTvf.getFunctionName();
this.originTvfProps = currentTvf.getProperties().getMap();
this.offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
+ } catch (AnalysisException ae) {
+ log.warn("parse streaming insert job failed, props: {}",
properties, ae);
+ throw new RuntimeException("parse streaming insert job failed, " +
ae.getMessage());
} catch (Exception ex) {
log.warn("init streaming insert job failed, sql: {}",
getExecuteSql(), ex);
- throw new RuntimeException("init streaming insert job failed, sql:
" + getExecuteSql(), ex);
+ throw new RuntimeException("init streaming insert job failed, " +
ex.getMessage());
}
}
- private UnboundTVFRelation getCurrentTvf() throws Exception {
+ private UnboundTVFRelation getCurrentTvf() {
if (baseCommand == null) {
this.baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(getExecuteSql());
}
@@ -170,7 +187,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
protected StreamingInsertTask createStreamingInsertTask() {
this.runningStreamTask = new StreamingInsertTask(getJobId(),
AbstractTask.getNextTaskId(), getExecuteSql(),
- offsetProvider, getCurrentDbName(), jobProperties,
getCreateUser());
+ offsetProvider, getCurrentDbName(), jobProperties,
originTvfProps, getCreateUser());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
this.runningStreamTask.setStatus(TaskStatus.PENDING);
return runningStreamTask;
@@ -213,17 +230,21 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
public void onStreamTaskFail(StreamingInsertTask task) throws JobException
{
+ failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
- this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL,
task.getErrMsg());
+ this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR,
task.getErrMsg());
}
updateJobStatus(JobStatus.PAUSED);
}
public void onStreamTaskSuccess(StreamingInsertTask task) {
+ succeedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
StreamingInsertTask nextTask = createStreamingInsertTask();
this.runningStreamTask = nextTask;
+ //todo: maybe fetch from txn attachment?
+ offsetProvider.updateOffset(task.getRunningOffset());
}
private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment
attachment) {
@@ -262,7 +283,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
trow.addToColumnValue(new TCell().setStringVal(getJobName()));
trow.addToColumnValue(new
TCell().setStringVal(getCreateUser().getQualifiedUser()));
trow.addToColumnValue(new
TCell().setStringVal(getJobConfig().getExecuteType().name()));
- trow.addToColumnValue(new
TCell().setStringVal(getJobConfig().convertRecurringStrategyToString()));
+ trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
@@ -276,16 +297,16 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
} else {
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
}
+
if (offsetProvider != null && offsetProvider.getRemoteOffset() !=
null) {
trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getRemoteOffset()));
} else {
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
}
- trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getRemoteOffset()));
trow.addToColumnValue(new TCell().setStringVal(
jobStatistic == null ? FeConstants.null_string :
jobStatistic.toJson()));
- trow.addToColumnValue(new TCell().setStringVal(failMsg == null ?
FeConstants.null_string : failMsg.getMsg()));
+ trow.addToColumnValue(new TCell().setStringVal(pauseReason == null ?
FeConstants.null_string : pauseReason.getMsg()));
return trow;
}
@@ -336,7 +357,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void beforeAborted(TransactionState txnState) throws
TransactionException {
-
}
@Override
@@ -387,7 +407,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void afterAborted(TransactionState txnState, boolean txnOperated,
String txnStatusChangeReason)
throws UserException {
-
}
@Override
@@ -402,14 +421,16 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void replayOnVisible(TransactionState txnState) {
-
-
}
@Override
public void gsonPostProcess() throws IOException {
- if (offsetProvider == null) {
+ if (offsetProvider == null && tvfType != null) {
offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
}
+
+ if (jobProperties == null && properties != null) {
+ jobProperties = new StreamingJobProperties(properties);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 73ac267e678..b1e22a0a7cb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -29,6 +29,7 @@ import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.parser.NereidsParser;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
@@ -39,6 +40,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,6 +67,7 @@ public class StreamingInsertTask {
private Offset runningOffset;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
private StreamingJobProperties jobProperties;
+ private Map<String, String> originTvfProps;
SourceOffsetProvider offsetProvider;
public StreamingInsertTask(long jobId,
@@ -73,6 +76,7 @@ public class StreamingInsertTask {
SourceOffsetProvider offsetProvider,
String currentDb,
StreamingJobProperties jobProperties,
+ Map<String, String> originTvfProps,
UserIdentity userIdentity) {
this.jobId = jobId;
this.taskId = taskId;
@@ -81,6 +85,7 @@ public class StreamingInsertTask {
this.currentDb = currentDb;
this.offsetProvider = offsetProvider;
this.jobProperties = jobProperties;
+ this.originTvfProps = originTvfProps;
this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
this.createTimeMs = System.currentTimeMillis();
}
@@ -118,8 +123,15 @@ public class StreamingInsertTask {
StatementContext statementContext = new StatementContext();
ctx.setStatementContext(statementContext);
- this.runningOffset = offsetProvider.getNextOffset(jobProperties,
jobProperties.getProperties());
- this.taskCommand = offsetProvider.rewriteTvfParams(sql, runningOffset);
+ this.runningOffset = offsetProvider.getNextOffset(jobProperties,
originTvfProps);
+ InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(sql);
+ StmtExecutor baseStmtExecutor =
+ new StmtExecutor(ctx, new LogicalPlanAdapter(baseCommand,
ctx.getStatementContext()));
+ baseCommand.initPlan(ctx, baseStmtExecutor, false);
+ if (!baseCommand.getParsedPlan().isPresent()) {
+ throw new JobException("Can not get Parsed plan");
+ }
+ this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand,
runningOffset);
this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER
+ getTaskId()));
this.taskCommand.setJobId(getTaskId());
this.stmtExecutor = new StmtExecutor(ctx, new
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 4f463a090d5..e71b169ef21 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -24,6 +24,7 @@ import org.apache.doris.job.exception.JobException;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.SessionVariable;
+import com.google.gson.annotations.SerializedName;
import lombok.Data;
import java.util.HashMap;
@@ -43,15 +44,18 @@ public class StreamingJobProperties implements
JobProperties {
private final Map<String, String> properties;
private long maxIntervalSecond;
- private int maxRetry;
private long s3BatchFiles;
private long s3BatchSize;
public StreamingJobProperties(Map<String, String> jobProperties) {
this.properties = jobProperties;
+ if (properties.isEmpty()) {
+ this.maxIntervalSecond = DEFAULT_MAX_INTERVAL_SECOND;
+ this.s3BatchFiles = DEFAULT_S3_BATCH_FILES;
+ this.s3BatchSize = DEFAULT_S3_BATCH_SIZE;
+ }
}
- @Override
public void validate() throws AnalysisException {
this.maxIntervalSecond = Util.getLongPropertyOrDefault(
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index d9b2264d5b6..e670dac553c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -62,7 +62,7 @@ public interface SourceOffsetProvider {
* @param nextOffset
* @return rewritten InsertIntoTableCommand
*/
- InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset
nextOffset) throws Exception;
+ InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand, Offset nextOffset);
/**
* Update the offset of the source.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index dd6927935bb..cbd8645cc59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -30,7 +30,7 @@ import java.util.List;
public class S3Offset implements Offset {
String startFile;
String endFile;
- List<String> fileLists;
+ String fileLists;
@Override
public void setEndOffset(String endOffset) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index a95f4af65c9..df4a344f064 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -19,20 +19,18 @@ package org.apache.doris.job.offset.s3;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
-import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.trees.expressions.Properties;
-import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.plans.Plan;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
-import org.apache.doris.qe.ConnectContext;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
@@ -40,12 +38,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
@Log4j2
public class S3SourceOffsetProvider implements SourceOffsetProvider {
S3Offset currentOffset;
String maxRemoteEndFile;
- InsertIntoTableCommand baseCommand;
@Override
public String getSourceType() {
@@ -54,19 +52,29 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
@Override
public S3Offset getNextOffset(StreamingJobProperties jobProps, Map<String,
String> properties) {
+ Map<String, String> copiedProps =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ copiedProps.putAll(properties);
S3Offset offset = new S3Offset();
- List<String> rfiles = new ArrayList<>();
+ List<RemoteFile> rfiles = new ArrayList<>();
String startFile = currentOffset == null ? null :
currentOffset.endFile;
String filePath = null;
- StorageProperties storageProperties =
StorageProperties.createPrimary(properties);
+ StorageProperties storageProperties =
StorageProperties.createPrimary(copiedProps);
try (RemoteFileSystem fileSystem =
FileSystemFactory.get(storageProperties)) {
- String uri = storageProperties.validateAndGetUri(properties);
+ String uri = storageProperties.validateAndGetUri(copiedProps);
filePath = storageProperties.validateAndNormalizeUri(uri);
maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles,
startFile,
jobProps.getS3BatchFiles(), jobProps.getS3BatchSize());
offset.setStartFile(startFile);
- offset.setEndFile(rfiles.get(rfiles.size() - 1));
- offset.setFileLists(rfiles);
+ //todo: The path may be in the form of bucket/dir/*/*,
+ // but currently only the case where the last segment is * is
handled.
+ if (!rfiles.isEmpty()) {
+ String bucket = rfiles.get(0).getBucket();
+ String parentPath = rfiles.get(0).getParentPath();
+ String filePaths =
rfiles.stream().map(RemoteFile::getName).collect(Collectors.joining(",", "{",
"}"));
+ String finalFiles = String.format("s3://%s/%s/%s", bucket,
parentPath, filePaths);
+ offset.setEndFile(String.format("s3://%s/%s/%s", bucket,
parentPath, rfiles.get(rfiles.size() - 1).getName()));
+ offset.setFileLists(finalFiles);
+ }
} catch (Exception e) {
log.warn("list path exception, path={}", filePath, e);
throw new RuntimeException(e);
@@ -93,23 +101,18 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
@Override
- public InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset
runningOffset) throws Exception {
+ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand, Offset runningOffset) {
S3Offset offset = (S3Offset) runningOffset;
Map<String, String> props = new HashMap<>();
- String finalUri = "{" + String.join(",", offset.getFileLists()) + "}";
- props.put("uri", finalUri);
- if (baseCommand == null) {
- this.baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(executeSql);
- this.baseCommand.initPlan(ConnectContext.get(),
ConnectContext.get().getExecutor(), false);
- }
-
// rewrite plan
- Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> {
- if (plan instanceof LogicalTVFRelation) {
- LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan;
- LogicalTVFRelation newRvfRel = new LogicalTVFRelation(
- originTvfRel.getRelationId(), new S3(new
Properties(props)), ImmutableList.of());
- return newRvfRel;
+ Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan
-> {
+ if (plan instanceof UnboundTVFRelation) {
+ UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan;
+ Map<String, String> oriMap =
originTvfRel.getProperties().getMap();
+ props.putAll(oriMap);
+ props.put("uri", offset.getFileLists());
+ return new UnboundTVFRelation(
+ originTvfRel.getRelationId(),
originTvfRel.getFunctionName(), new Properties(props));
}
return plan;
});
@@ -120,14 +123,17 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
@Override
public void updateOffset(Offset offset) {
this.currentOffset = (S3Offset) offset;
+ this.currentOffset.setFileLists(null);
}
@Override
public void fetchRemoteMeta(Map<String, String> properties) throws
Exception {
- StorageProperties storageProperties =
StorageProperties.createPrimary(properties);
+ Map<String, String> copiedProps =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ copiedProps.putAll(properties);
+ StorageProperties storageProperties =
StorageProperties.createPrimary(copiedProps);
String startFile = currentOffset == null ? null :
currentOffset.endFile;
try (RemoteFileSystem fileSystem =
FileSystemFactory.get(storageProperties)) {
- String uri = storageProperties.validateAndGetUri(properties);
+ String uri = storageProperties.validateAndGetUri(copiedProps);
String filePath = storageProperties.validateAndNormalizeUri(uri);
maxRemoteEndFile = fileSystem.globListWithLimit(filePath, new
ArrayList<>(), startFile,
1, 1);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 53a90893b81..b1823de4a1d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -22,13 +22,9 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
-import org.apache.doris.job.base.JobExecutionConfiguration;
-import org.apache.doris.job.base.TimerDefinition;
-import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
-import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
@@ -81,25 +77,18 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync {
if (job instanceof StreamingInsertJob) {
StreamingInsertJob originJob = (StreamingInsertJob) job;
String updateSQL = StringUtils.isEmpty(sql) ?
originJob.getExecuteSql() : sql;
- Map<String, String> updateProps = properties == null ||
properties.isEmpty() ? originJob.getJobProperties()
- .getProperties() : properties;
- StreamingJobProperties streamJobProps = new
StreamingJobProperties(updateProps);
- // rebuild time definition
- JobExecutionConfiguration execConfig = originJob.getJobConfig();
- TimerDefinition timerDefinition = new TimerDefinition();
- timerDefinition.setInterval(streamJobProps.getMaxIntervalSecond());
- timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
-
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
- execConfig.setTimerDefinition(timerDefinition);
+ Map<String, String> updateProps =
+ properties == null || properties.isEmpty() ?
originJob.getProperties() : properties;
+
return new StreamingInsertJob(jobName,
job.getJobStatus(),
job.getCurrentDbName(),
job.getComment(),
ConnectContext.get().getCurrentUserIdentity(),
- execConfig,
+ originJob.getJobConfig(),
System.currentTimeMillis(),
updateSQL,
- streamJobProps);
+ updateProps);
} else {
throw new JobException("Unsupported job type for ALTER:" +
job.getJobType());
}
@@ -117,7 +106,7 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync {
if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
&& job instanceof StreamingInsertJob) {
StreamingInsertJob streamingJob = (StreamingInsertJob) job;
- boolean proCheck =
checkProperties(streamingJob.getJobProperties().getProperties());
+ boolean proCheck = checkProperties(streamingJob.getProperties());
boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
if (!proCheck && !sqlCheck) {
throw new AnalysisException("No properties or sql changed in
ALTER JOB");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index 51d0dddc7cb..40a4d583059 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -125,11 +125,8 @@ public class CreateJobInfo {
// check its insert stmt,currently only support insert stmt
JobExecutionConfiguration jobExecutionConfiguration = new
JobExecutionConfiguration();
JobExecuteType executeType = intervalOptional.isPresent() ?
JobExecuteType.RECURRING : JobExecuteType.ONE_TIME;
- JobProperties properties = null;
if (streamingJob) {
executeType = JobExecuteType.STREAMING;
- properties = new StreamingJobProperties(jobProperties);
- properties.validate();
jobExecutionConfiguration.setImmediate(true);
}
jobExecutionConfiguration.setExecuteType(executeType);
@@ -138,21 +135,18 @@ public class CreateJobInfo {
if (executeType.equals(JobExecuteType.ONE_TIME)) {
buildOnceJob(timerDefinition, jobExecutionConfiguration);
} else if (executeType.equals(JobExecuteType.STREAMING)) {
- buildStreamingJob(timerDefinition, properties);
+ buildStreamingJob(timerDefinition);
} else {
buildRecurringJob(timerDefinition, jobExecutionConfiguration);
}
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
- return analyzeAndCreateJob(executeSql, dbName,
jobExecutionConfiguration, properties);
+ return analyzeAndCreateJob(executeSql, dbName,
jobExecutionConfiguration, jobProperties);
}
- private void buildStreamingJob(TimerDefinition timerDefinition,
JobProperties props)
- throws AnalysisException {
- StreamingJobProperties properties = (StreamingJobProperties) props;
- timerDefinition.setInterval(properties.getMaxIntervalSecond());
+ private void buildStreamingJob(TimerDefinition timerDefinition) {
+ // timerDefinition.setInterval(properties.getMaxIntervalSecond());
timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
timerDefinition.setStartTimeMs(System.currentTimeMillis());
- properties.validate();
}
/**
@@ -237,7 +231,7 @@ public class CreateJobInfo {
*/
private AbstractJob analyzeAndCreateJob(String sql, String currentDbName,
JobExecutionConfiguration
jobExecutionConfiguration,
- JobProperties properties) throws
UserException {
+ Map<String, String> properties)
throws UserException {
if
(jobExecutionConfiguration.getExecuteType().equals(JobExecuteType.STREAMING)) {
return analyzeAndCreateStreamingInsertJob(sql, currentDbName,
jobExecutionConfiguration, properties);
} else {
@@ -271,13 +265,13 @@ public class CreateJobInfo {
}
private AbstractJob analyzeAndCreateStreamingInsertJob(String sql, String
currentDbName,
- JobExecutionConfiguration jobExecutionConfiguration, JobProperties
properties) throws UserException {
+ JobExecutionConfiguration jobExecutionConfiguration, Map<String,
String> properties) throws UserException {
NereidsParser parser = new NereidsParser();
LogicalPlan logicalPlan = parser.parseSingle(sql);
if (logicalPlan instanceof InsertIntoTableCommand) {
- // InsertIntoTableCommand insertIntoTableCommand =
(InsertIntoTableCommand) logicalPlan;
+ InsertIntoTableCommand insertIntoTableCommand =
(InsertIntoTableCommand) logicalPlan;
try {
- // insertIntoTableCommand.initPlan(ConnectContext.get(),
ConnectContext.get().getExecutor(), false);
+ insertIntoTableCommand.initPlan(ConnectContext.get(),
ConnectContext.get().getExecutor(), false);
return new StreamingInsertJob(labelNameOptional.get(),
JobStatus.PENDING,
currentDbName,
@@ -286,7 +280,7 @@ public class CreateJobInfo {
jobExecutionConfiguration,
System.currentTimeMillis(),
sql,
- (StreamingJobProperties) properties);
+ properties);
} catch (Exception e) {
throw new AnalysisException(e.getMessage());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 6736b16f8af..e774499f361 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -61,6 +61,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -103,6 +104,7 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
private Optional<LogicalPlan> logicalQuery;
private Optional<String> labelName;
private Optional<String> branchName;
+ private Optional<Plan> parsedPlan;
/**
* When source it's from job scheduler,it will be set.
*/
@@ -153,6 +155,10 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
return logicalQuery.orElse(originLogicalQuery);
}
+ public Optional<Plan> getParsedPlan() {
+ return parsedPlan;
+ }
+
protected void setLogicalQuery(LogicalPlan logicalQuery) {
this.logicalQuery = Optional.of(logicalQuery);
}
@@ -233,6 +239,7 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
throw new IllegalStateException(e.getMessage(), e);
}
insertExecutor = buildResult.executor;
+ parsedPlan =
Optional.ofNullable(buildResult.planner.getParsedPlan());
if (!needBeginTransaction) {
return insertExecutor;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
index bef0bb92d20..f05f1791f2f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
@@ -20,22 +20,31 @@ package org.apache.doris.catalog;
import org.apache.doris.common.FeConstants;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.functions.table.S3;
+import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateFunctionCommand;
import org.apache.doris.nereids.trees.plans.commands.DropFunctionCommand;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.utframe.DorisAssert;
import org.apache.doris.utframe.UtFrameUtils;
+import com.google.common.collect.ImmutableList;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
@@ -63,9 +72,27 @@ public class DropFunctionTest {
public void testDropGlobalFunction() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
// 1. create database db1
- String sql = "create database db1;";
+ //String sql = "create database db1;";
+ String sql = "insert into db1.tb select * from
s3('url'='s3://a/*.csv')";
NereidsParser nereidsParser = new NereidsParser();
LogicalPlan logicalPlan = nereidsParser.parseSingle(sql);
+ InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(sql);
+ baseCommand.initPlan(ConnectContext.get(),
ConnectContext.get().getExecutor(), false);
+ Map<String, String> map = new HashMap<>();
+ map.put("url" ,"s3:/xxxx/*.");
+ // rewrite plan
+ Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> {
+ if (plan instanceof LogicalTVFRelation) {
+ LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan;
+ LogicalTVFRelation newRvfRel = new LogicalTVFRelation(
+ originTvfRel.getRelationId(), new S3(new
Properties(map)), ImmutableList.of());
+ return newRvfRel;
+ }
+ return plan;
+ });
+ InsertIntoTableCommand s = new InsertIntoTableCommand((LogicalPlan)
rewritePlan, Optional.empty(), Optional.empty(),
+ Optional.empty(), true, Optional.empty());
+
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
if (logicalPlan instanceof CreateDatabaseCommand) {
((CreateDatabaseCommand) logicalPlan).run(connectContext,
stmtExecutor);
diff --git
a/regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out
new file mode 100644
index 00000000000..867b1f9432b
Binary files /dev/null and
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out differ
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
new file mode 100644
index 00000000000..bd8ee2759fd
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job") {
+ def tableName = "test_streaming_insert_job_tbl"
+ def jobName = "test_streaming_insert_job_name"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+
+ // create recurring job
+ sql """
+ CREATE JOB ${jobName} ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ Awaitility.await().atMost(30, SECONDS).until(
+ {
+ print("check success task count")
+ def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
+ // check job status and succeed task count larger than 1
+ jobSuccendCount.size() == 1 && '1' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+
+ sql """
+ PAUSE JOB where jobname = '${jobName}'
+ """
+ def pausedJobStatus = sql """
+ select status from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert pausedJobStatus.get(0).get(0) == "PAUSED"
+
+ qt_select """ SELECT * FROM ${tableName} order by c1 """
+
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]