This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new e0ee446b8d6 [Feature](wip) improve streamjob and delete alterstreamjob
op (#56308)
e0ee446b8d6 is described below
commit e0ee446b8d6458cfc495a594d7948a2f464953e2
Author: wudi <[email protected]>
AuthorDate: Mon Sep 22 18:39:03 2025 +0800
[Feature](wip) improve streamjob and delete alterstreamjob op (#56308)
### What problem does this PR solve?
improve streamjob and delete alterstreamjob op
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +-
.../property/storage/HdfsPropertiesUtils.java | 12 ++-
.../property/storage/StorageProperties.java | 57 ++------------
.../main/java/org/apache/doris/fs/FileSystem.java | 2 +-
.../java/org/apache/doris/fs/GlobListResult.java | 57 ++++++++++++++
.../java/org/apache/doris/fs/obj/S3ObjStorage.java | 33 ++-------
.../org/apache/doris/fs/remote/RemoteFile.java | 18 -----
.../org/apache/doris/fs/remote/S3FileSystem.java | 3 +-
.../org/apache/doris/job/base/AbstractJob.java | 9 +--
.../doris/job/executor/DispatchTaskHandler.java | 5 +-
.../insert/streaming/StreamingInsertJob.java | 30 ++++----
.../insert/streaming/StreamingJobProperties.java | 31 ++++----
.../org/apache/doris/job/manager/JobManager.java | 23 +++---
.../java/org/apache/doris/job/offset/Offset.java | 2 +
.../doris/job/offset/SourceOffsetProvider.java | 14 +---
.../org/apache/doris/job/offset/s3/S3Offset.java | 6 ++
.../job/offset/s3/S3SourceOffsetProvider.java | 69 ++++++++++-------
.../org/apache/doris/journal/JournalEntity.java | 6 --
.../trees/plans/commands/AlterJobCommand.java | 6 +-
.../trees/plans/commands/info/CreateJobInfo.java | 4 +-
.../commands/insert/InsertIntoTableCommand.java | 19 +----
.../persist/AlterStreamingJobOperationLog.java | 86 ----------------------
.../java/org/apache/doris/persist/EditLog.java | 9 ---
.../org/apache/doris/persist/OperationType.java | 2 -
.../java/org/apache/doris/qe/SessionVariable.java | 1 +
.../streaming_job/test_streaming_insert_job.groovy | 10 +--
26 files changed, 195 insertions(+), 321 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index d4cfe9ead5d..3062dfef988 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -112,7 +112,7 @@ supportedJobStatement
commentSpec?
DO supportedDmlStatement
#createScheduledJob
| PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL)
#pauseJob
- | ALTER JOB FOR (jobName=multipartIdentifier) (propertyClause |
supportedDmlStatement | propertyClause supportedDmlStatement)
#alterJob
+ | ALTER JOB (jobName=multipartIdentifier) (propertyClause |
supportedDmlStatement | propertyClause supportedDmlStatement)
#alterJob
| DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ
(jobNameValue=STRING_LITERAL)
#dropJob
| RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL)
#resumeJob
| CANCEL TASK WHERE (jobNameKey=identifier) EQ
(jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ
(taskIdValue=INTEGER_VALUE) #cancelJobTask
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
index 7fc091daa64..8f2e92d1e64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
@@ -23,6 +23,8 @@ import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesE
import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.UnsupportedEncodingException;
import java.net.URI;
@@ -38,6 +40,7 @@ import java.util.Set;
import java.util.stream.Collectors;
public class HdfsPropertiesUtils {
+ private static final Logger LOG =
LogManager.getLogger(HdfsPropertiesUtils.class);
private static final String URI_KEY = "uri";
private static final String STANDARD_HDFS_PREFIX = "hdfs://";
private static final String EMPTY_HDFS_PREFIX = "hdfs:///";
@@ -63,7 +66,14 @@ public class HdfsPropertiesUtils {
if (StringUtils.isBlank(uriStr)) {
return false;
}
- URI uri = URI.create(uriStr);
+ URI uri;
+ try {
+ uri = URI.create(uriStr);
+ } catch (Exception ex) {
+ // The glob syntax of s3 contains {, which will cause an error
here.
+ LOG.warn("Failed to validate uri is hdfs uri, {}",
ex.getMessage());
+ return false;
+ }
String schema = uri.getScheme();
if (StringUtils.isBlank(schema)) {
throw new IllegalArgumentException("Invalid uri: " + uriStr + ",
extract schema is null");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 337b897f256..2ce87f9ffb9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -146,62 +146,17 @@ public abstract class StorageProperties extends
ConnectionProperties {
* @throws RuntimeException if no supported storage type is found
*/
public static StorageProperties createPrimary(Map<String, String>
origProps) {
- StorageProperties p = createPrimaryInternal(origProps);
- if (p == null) {
- for (Function<Map<String, String>, StorageProperties> func :
PROVIDERS) {
- p = func.apply(origProps);
- if (p != null) {
- break;
- }
+ for (Function<Map<String, String>, StorageProperties> func :
PROVIDERS) {
+ StorageProperties p = func.apply(origProps);
+ if (p != null) {
+ p.initNormalizeAndCheckProps();
+ p.initializeHadoopStorageConfig();
+ return p;
}
}
- if (p != null) {
- p.initNormalizeAndCheckProps();
- p.initializeHadoopStorageConfig();
- return p;
- }
throw new StoragePropertiesException("No supported storage type found.
Please check your configuration.");
}
- private static StorageProperties createPrimaryInternal(Map<String, String>
origProps) {
- String provider = origProps.get(FS_PROVIDER_KEY);
- if (provider == null) {
- return null;
- }
-
- try {
- Type type = Type.valueOf(provider.trim().toUpperCase());
- switch (type) {
- case HDFS:
- return new HdfsProperties(origProps);
- case OSS_HDFS:
- return new OSSHdfsProperties(origProps);
- case S3:
- return new S3Properties(origProps);
- case OSS:
- return new OSSProperties(origProps);
- case OBS:
- return new OBSProperties(origProps);
- case COS:
- return new COSProperties(origProps);
- case GCS:
- return new GCSProperties(origProps);
- case AZURE:
- return new AzureProperties(origProps);
- case MINIO:
- return new MinioProperties(origProps);
- case BROKER:
- return new BrokerProperties(origProps);
- case LOCAL:
- return new LocalProperties(origProps);
- default:
- return null;
- }
- } catch (Exception e) {
- return null;
- }
- }
-
private static final List<Function<Map<String, String>,
StorageProperties>> PROVIDERS =
Arrays.asList(
props -> (isFsSupport(props, FS_HDFS_SUPPORT)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index b2c6957ce38..81cdaf5d2a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -124,7 +124,7 @@ public interface FileSystem {
* @param fileNumLimit limit the total number of files to be listed.
* @return
*/
- default String globListWithLimit(String remotePath, List<RemoteFile>
result,
+ default GlobListResult globListWithLimit(String remotePath,
List<RemoteFile> result,
String startFile, long fileSizeLimit, long fileNumLimit) {
throw new UnsupportedOperationException("Unsupported operation glob
list with limit on current file system.");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/GlobListResult.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/GlobListResult.java
new file mode 100644
index 00000000000..af0d2817ffc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/GlobListResult.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.backup.Status;
+
+public class GlobListResult {
+ private final Status status;
+ private final String maxFile;
+ private final String bucket;
+ private final String prefix;
+
+ public GlobListResult(Status status, String maxFile, String bucket, String
prefix) {
+ this.status = status;
+ this.maxFile = maxFile;
+ this.bucket = bucket;
+ this.prefix = prefix;
+ }
+
+ public GlobListResult(Status status) {
+ this.status = status;
+ this.maxFile = "";
+ this.bucket = "";
+ this.prefix = "";
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public String getMaxFile() {
+ return maxFile;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index f7cc644017f..9522449c7d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.util.S3URI;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.common.util.Util;
import
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
+import org.apache.doris.fs.GlobListResult;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.commons.lang3.StringUtils;
@@ -541,13 +542,11 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
* Limit: Starting from startFile, until the total file size is greater
than fileSizeLimit,
* or the number of files is greater than fileNumLimit.
*
- * @return The largest file name after listObject this time
+ * @return GlobListResult
*/
- public String globListWithLimit(String remotePath, List<RemoteFile>
result, String startFile,
+ public GlobListResult globListWithLimit(String remotePath,
List<RemoteFile> result, String startFile,
long fileSizeLimit, long fileNumLimit) {
- GlobListResult globListResult = globListInternal(remotePath, result,
true, startFile, fileSizeLimit,
- fileNumLimit);
- return globListResult.getMaxFile();
+ return globListInternal(remotePath, result, false, startFile,
fileSizeLimit, fileNumLimit);
}
/**
@@ -643,8 +642,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
isPrefix ? -1 : obj.size(),
isPrefix ? 0 :
obj.lastModified().toEpochMilli()
);
- remoteFile.setBucket(bucket);
-
remoteFile.setParentPath(objPath.getParent().toString());
result.add(remoteFile);
if (hasLimits && reachLimit(result.size(),
matchFileSize, fileSizeLimit, fileNumLimit)) {
@@ -677,11 +674,11 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
if (LOG.isDebugEnabled()) {
LOG.debug("remotePath:{}, result:{}", remotePath, result);
}
- return new GlobListResult(Status.OK, currentMaxFile);
+ return new GlobListResult(Status.OK, currentMaxFile, bucket,
finalPrefix);
} catch (Exception e) {
LOG.warn("Errors while getting file status", e);
return new GlobListResult(new Status(Status.ErrCode.COMMON_ERROR,
- "Errors while getting file status " +
Util.getRootCauseMessage(e)), "");
+ "Errors while getting file status " +
Util.getRootCauseMessage(e)));
} finally {
long endTime = System.nanoTime();
long duration = endTime - startTime;
@@ -715,24 +712,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
return false;
}
- private static class GlobListResult {
- private final Status status;
- private final String maxFile;
-
- public GlobListResult(Status status, String maxFile) {
- this.status = status;
- this.maxFile = maxFile;
- }
-
- public Status getStatus() {
- return status;
- }
-
- public String getMaxFile() {
- return maxFile;
- }
- }
-
@Override
public synchronized void close() throws Exception {
if (client != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
index ac1f8add947..1f6f0225278 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
@@ -36,8 +36,6 @@ public class RemoteFile {
private long modificationTime;
private Path path;
BlockLocation[] blockLocations;
- private String parentPath;
- private String bucket;
public RemoteFile(String name, boolean isFile, long size, long blockSize) {
this(name, null, isFile, !isFile, size, blockSize, 0, null);
@@ -77,22 +75,6 @@ public class RemoteFile {
this.path = path;
}
- public String getBucket() {
- return bucket;
- }
-
- public void setBucket(String bucket) {
- this.bucket = bucket;
- }
-
- public String getParentPath() {
- return parentPath;
- }
-
- public void setParentPath(String parentPath) {
- this.parentPath = parentPath;
- }
-
public boolean isFile() {
return isFile;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index d4e5a23c20e..007b67de84d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.fs.GlobListResult;
import org.apache.doris.fs.obj.S3ObjStorage;
import org.apache.logging.log4j.LogManager;
@@ -69,7 +70,7 @@ public class S3FileSystem extends ObjFileSystem {
}
@Override
- public String globListWithLimit(String remotePath, List<RemoteFile>
result, String startFile,
+ public GlobListResult globListWithLimit(String remotePath,
List<RemoteFile> result, String startFile,
long fileSizeLimit, long fileNumLimit) {
S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
return objStorage.globListWithLimit(remotePath, result, startFile,
fileSizeLimit, fileNumLimit);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index a58dddb55e7..5b03ae6d18b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -31,7 +31,6 @@ import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.persist.AlterStreamingJobOperationLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
@@ -218,6 +217,10 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
}
public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
+ if (JobExecuteType.STREAMING.equals(getJobConfig().getExecuteType())) {
+ taskType = TaskType.STREAMING;
+ }
+
if (!canCreateTask(taskType)) {
log.info("job is not ready for scheduling, job id is {},job status
is {}, taskType is {}", jobId,
jobStatus, taskType);
@@ -478,10 +481,6 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg",
"replay delete scheduler job").build());
}
- public void onReplayUpdateStreaming(AlterStreamingJobOperationLog
operationLog) {
- log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg",
"replay update streaming job").build());
- }
-
public boolean needPersist() {
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index 23710c17659..35b1f351f72 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -18,7 +18,6 @@
package org.apache.doris.job.executor;
import org.apache.doris.job.base.AbstractJob;
-import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.common.TaskType;
@@ -57,9 +56,7 @@ public class DispatchTaskHandler<T extends AbstractJob>
implements WorkHandler<T
return;
}
if (event.getJob().isReadyForScheduling(null) &&
JobUtils.checkNeedSchedule(event.getJob())) {
- TaskType taskType =
JobExecuteType.STREAMING.equals(event.getJob().getJobConfig().getExecuteType())
- ? TaskType.STREAMING : TaskType.SCHEDULED;
- List<? extends AbstractTask> tasks =
event.getJob().commonCreateTasks(taskType, null);
+ List<? extends AbstractTask> tasks =
event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
if (CollectionUtils.isEmpty(tasks)) {
log.warn("job is ready for scheduling, but create task is
empty, skip scheduler,"
+ "job id is {}," + " job name is {}",
event.getJob().getJobId(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index e8fd03c8de4..b32ecdcb94e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -46,7 +46,6 @@ import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.parser.NereidsParser;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
-import org.apache.doris.persist.AlterStreamingJobOperationLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
@@ -253,12 +252,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return offsetProvider.hasMoreDataToConsume();
}
- @Override
- public void logUpdateOperation() {
- AlterStreamingJobOperationLog log =
- new AlterStreamingJobOperationLog(this.getJobId(),
this.getJobStatus(), properties, getExecuteSql());
- Env.getCurrentEnv().getEditLog().logUpdateStreamingJob(log);
- }
@Override
public void onTaskFail(StreamingJobSchedulerTask task) throws JobException
{
@@ -321,13 +314,16 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
super.onReplayCreate();
}
- @Override
- public void onReplayUpdateStreaming(AlterStreamingJobOperationLog
operationLog) {
- super.onReplayUpdateStreaming(operationLog);
- setJobStatus(operationLog.getStatus());
- this.properties = operationLog.getJobProperties();
+ /**
+ * Because the offset statistics of the streamingInsertJob are all stored
in txn,
+ * only some fields are replayed here.
+ * @param replayJob
+ */
+ public void replayOnUpdated(StreamingInsertJob replayJob) {
+ setJobStatus(replayJob.getJobStatus());
+ this.properties = replayJob.getProperties();
this.jobProperties = new StreamingJobProperties(properties);
- setExecuteSql(operationLog.getExecuteSql());
+ setExecuteSql(replayJob.getExecuteSql());
}
@Override
@@ -358,14 +354,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
trow.addToColumnValue(new TCell().setStringVal(properties != null
? GsonUtils.GSON.toJson(properties) :
FeConstants.null_string));
- if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
- trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getSyncOffset()));
+ if (offsetProvider != null && offsetProvider.getConsumedOffset() !=
null) {
+ trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getConsumedOffset()));
} else {
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
}
- if (offsetProvider != null && offsetProvider.getRemoteOffset() !=
null) {
- trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getRemoteOffset()));
+ if (offsetProvider != null && offsetProvider.getMaxOffset() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getMaxOffset()));
} else {
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index a79bc1b230d..35aa601fceb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -32,44 +32,45 @@ import java.util.Map;
@Data
public class StreamingJobProperties implements JobProperties {
public static final String MAX_INTERVAL_SECOND_PROPERTY = "max_interval";
- public static final String S3_BATCH_FILES_PROPERTY = "s3.batch_files";
- public static final String S3_BATCH_SIZE_PROPERTY = "s3.batch_size";
+ public static final String S3_MAX_BATCH_FILES_PROPERTY =
"s3.max_batch_files";
+ public static final String S3_MAX_BATCH_BYTES_PROPERTY =
"s3.max_batch_bytes";
public static final String SESSION_VAR_PREFIX = "session.";
public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
- public static final long DEFAULT_S3_BATCH_FILES = 256;
- public static final long DEFAULT_S3_BATCH_SIZE = 10 * 1024 * 1024 * 1024L;
// 10GB
+ public static final long DEFAULT_MAX_S3_BATCH_FILES = 256;
+ public static final long DEFAULT_MAX_S3_BATCH_BYTES = 10 * 1024 * 1024 *
1024L; // 10GB
public static final int DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
private final Map<String, String> properties;
private long maxIntervalSecond;
private long s3BatchFiles;
- private long s3BatchSize;
+ private long s3BatchBytes;
public StreamingJobProperties(Map<String, String> jobProperties) {
this.properties = jobProperties;
if (properties.isEmpty()) {
this.maxIntervalSecond = DEFAULT_MAX_INTERVAL_SECOND;
- this.s3BatchFiles = DEFAULT_S3_BATCH_FILES;
- this.s3BatchSize = DEFAULT_S3_BATCH_SIZE;
+ this.s3BatchFiles = DEFAULT_MAX_S3_BATCH_FILES;
+ this.s3BatchBytes = DEFAULT_MAX_S3_BATCH_BYTES;
}
}
public void validate() throws AnalysisException {
this.maxIntervalSecond = Util.getLongPropertyOrDefault(
-
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
- StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND, (v) -> v
>= 1,
+
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
+ StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND,
(v) -> v >= 1,
StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY + " should
> 1");
this.s3BatchFiles = Util.getLongPropertyOrDefault(
- properties.get(StreamingJobProperties.S3_BATCH_FILES_PROPERTY),
- StreamingJobProperties.DEFAULT_S3_BATCH_FILES, (v) -> v >= 1,
- StreamingJobProperties.S3_BATCH_FILES_PROPERTY + " should >=1
");
+
properties.get(StreamingJobProperties.S3_MAX_BATCH_FILES_PROPERTY),
+ StreamingJobProperties.DEFAULT_MAX_S3_BATCH_FILES, (v)
-> v >= 1,
+ StreamingJobProperties.S3_MAX_BATCH_FILES_PROPERTY + " should
>=1 ");
- this.s3BatchSize =
Util.getLongPropertyOrDefault(properties.get(StreamingJobProperties.S3_BATCH_SIZE_PROPERTY),
- StreamingJobProperties.DEFAULT_S3_BATCH_SIZE, (v) -> v >= 100
* 1024 * 1024
+ this.s3BatchBytes = Util.getLongPropertyOrDefault(
+
properties.get(StreamingJobProperties.S3_MAX_BATCH_BYTES_PROPERTY),
+ StreamingJobProperties.DEFAULT_MAX_S3_BATCH_BYTES, (v)
-> v >= 100 * 1024 * 1024
&& v <= (long) (1024 * 1024 * 1024) * 10,
- StreamingJobProperties.S3_BATCH_SIZE_PROPERTY + " should
between 100MB and 10GB");
+ StreamingJobProperties.S3_MAX_BATCH_BYTES_PROPERTY + " should
between 100MB and 10GB");
}
public SessionVariable getSessionVariable() throws JobException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 14b12f837e0..8ce9fb7ea73 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -38,6 +38,7 @@ import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.job.scheduler.StreamingTaskScheduler;
@@ -45,7 +46,6 @@ import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.persist.AlterStreamingJobOperationLog;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
@@ -368,21 +368,16 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
LOG.warn("replayUpdateJob not normal, job: {}, jobId: {}, jobMap:
{}", job, jobId, jobMap);
return;
}
- jobMap.put(jobId, job);
- log.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
- .add("msg", "replay update scheduler job").build());
- }
- public void replayUpdateStreamingJob(AlterStreamingJobOperationLog log) {
- Long jobId = log.getJobId();
- if (!jobMap.containsKey(jobId)) {
- LOG.warn("replayUpdateStreamingJob not normal, jobId: {}, jobMap:
{}", jobId, log);
- return;
+ if (job instanceof StreamingInsertJob) {
+ // for streaming job, we only update part properties
+ StreamingInsertJob currentJob = (StreamingInsertJob)
jobMap.get(jobId);
+ currentJob.replayOnUpdated((StreamingInsertJob) job);
+ } else {
+ jobMap.put(jobId, job);
}
- T job = jobMap.get(jobId);
- job.onReplayUpdateStreaming(log);
- LOG.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
- .add("msg", "replay update streaming job").build());
+ log.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
+ .add("msg", "replay update scheduler job").build());
}
/**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
index 03e664ccd3e..cebff9a39b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
@@ -19,4 +19,6 @@ package org.apache.doris.job.offset;
public interface Offset {
String toSerializedJson();
+
+ boolean isEmpty();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index d7e1bee4669..a27b66812c2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -40,22 +40,16 @@ public interface SourceOffsetProvider {
Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String>
properties);
/**
- * Get current offset
+ * Get consumered offset to show
* @return
*/
- Offset getCurrentOffset();
+ String getConsumedOffset();
/**
- * Get sync offset to show
+ * Get remote datasource max offset
* @return
*/
- String getSyncOffset();
-
- /**
- * Get remote offset
- * @return
- */
- String getRemoteOffset();
+ String getMaxOffset();
/**
* Rewrite the TVF parameters in the SQL based on the current offset.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index cb783adfbff..b152660873a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -23,6 +23,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
@Getter
@Setter
@@ -39,6 +40,11 @@ public class S3Offset implements Offset {
return GsonUtils.GSON.toJson(this);
}
+ @Override
+ public boolean isEmpty() {
+ return StringUtils.isEmpty(fileLists);
+ }
+
@Override
public String toString() {
return "{ \"startFile\": \"" + startFile + "\", \"endFile\": \"" +
endFile + "\" }";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index d433e42f73c..e6a5cf809a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -19,6 +19,7 @@ package org.apache.doris.job.offset.s3;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.GlobListResult;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
@Log4j2
public class S3SourceOffsetProvider implements SourceOffsetProvider {
S3Offset currentOffset;
- String maxRemoteEndFile;
+ String maxEndFile;
@Override
public String getSourceType() {
@@ -64,18 +65,41 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
try (RemoteFileSystem fileSystem =
FileSystemFactory.get(storageProperties)) {
String uri = storageProperties.validateAndGetUri(copiedProps);
filePath = storageProperties.validateAndNormalizeUri(uri);
- maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles,
startFile,
- jobProps.getS3BatchFiles(), jobProps.getS3BatchSize());
- offset.setStartFile(startFile);
- //todo: The path may be in the form of bucket/dir/*/*,
- // but currently only the case where the last segment is * is
handled.
+ GlobListResult globListResult =
fileSystem.globListWithLimit(filePath, rfiles, startFile,
+ jobProps.getS3BatchBytes(), jobProps.getS3BatchFiles());
+
if (!rfiles.isEmpty()) {
- String bucket = rfiles.get(0).getBucket();
- String parentPath = rfiles.get(0).getParentPath();
- String filePaths =
rfiles.stream().map(RemoteFile::getName).collect(Collectors.joining(",", "{",
"}"));
- String finalFiles = String.format("s3://%s/%s/%s", bucket,
parentPath, filePaths);
- offset.setEndFile(String.format("%s/%s", parentPath,
rfiles.get(rfiles.size() - 1).getName()));
- offset.setFileLists(finalFiles);
+ String bucket = globListResult.getBucket();
+ String prefix = globListResult.getPrefix();
+
+ offset.setStartFile(startFile);
+ String bucketBase = "s3://" + bucket + "/";
+ // Get the path of the last directory
+ int lastSlash = prefix.lastIndexOf('/');
+ String basePrefix = (lastSlash >= 0) ? prefix.substring(0,
lastSlash + 1) : "";
+ String filePathBase = bucketBase + basePrefix;
+ String joined = rfiles.stream()
+ .filter(name -> !name.equals(filePathBase)) // Single
file case
+ .map(path -> path.getName().replace(filePathBase, ""))
+ .collect(Collectors.joining(","));
+
+ if (joined.isEmpty()) {
+ // base is a single file
+ offset.setFileLists(filePathBase);
+ String lastFile = rfiles.get(rfiles.size() -
1).getName().replace(bucketBase, "");
+ offset.setEndFile(lastFile);
+ } else {
+ // base is dir
+ String normalizedPrefix = basePrefix.endsWith("/")
+ ? basePrefix.substring(0, basePrefix.length() - 1)
: basePrefix;
+ String finalFileLists = String.format("s3://%s/%s/{%s}",
bucket, normalizedPrefix, joined);
+ String lastFile = rfiles.get(rfiles.size() -
1).getName().replace(bucketBase, "");
+ offset.setFileLists(finalFileLists);
+ offset.setEndFile(lastFile);
+ }
+ maxEndFile = globListResult.getMaxFile();
+ } else {
+ throw new RuntimeException("No new files found in path: " +
filePath);
}
} catch (Exception e) {
log.warn("list path exception, path={}", filePath, e);
@@ -85,12 +109,7 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
@Override
- public Offset getCurrentOffset() {
- return currentOffset;
- }
-
- @Override
- public String getSyncOffset() {
+ public String getConsumedOffset() {
if (currentOffset != null) {
return currentOffset.getEndFile();
}
@@ -98,8 +117,8 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
@Override
- public String getRemoteOffset() {
- return maxRemoteEndFile;
+ public String getMaxOffset() {
+ return maxEndFile;
}
@Override
@@ -138,11 +157,11 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
String uri = storageProperties.validateAndGetUri(copiedProps);
String filePath = storageProperties.validateAndNormalizeUri(uri);
List<RemoteFile> objects = new ArrayList<>();
- String endFile = fileSystem.globListWithLimit(filePath, objects,
startFile, 1, 1);
- if (!objects.isEmpty() && StringUtils.isNotEmpty(endFile)) {
- maxRemoteEndFile = endFile;
+ GlobListResult globListResult =
fileSystem.globListWithLimit(filePath, objects, startFile, 1, 1);
+ if (globListResult != null && !objects.isEmpty() &&
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
+ maxEndFile = globListResult.getMaxFile();
} else {
- maxRemoteEndFile = startFile;
+ maxEndFile = startFile;
}
} catch (Exception e) {
throw e;
@@ -154,7 +173,7 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
if (currentOffset == null) {
return true;
}
- if (currentOffset.endFile.compareTo(maxRemoteEndFile) < 0) {
+ if (currentOffset.endFile.compareTo(maxEndFile) < 0) {
return true;
}
return false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index f0f9d471ed9..cb6dee000de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -67,7 +67,6 @@ import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
-import org.apache.doris.persist.AlterStreamingJobOperationLog;
import org.apache.doris.persist.AlterUserOperationLog;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.AnalyzeDeletionLog;
@@ -551,11 +550,6 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
- case OperationType.OP_UPDATE_STREAMING_JOB: {
- data = AlterStreamingJobOperationLog.read(in);
- isRead = true;
- break;
- }
case OperationType.OP_CREATE_SCHEDULER_TASK:
case OperationType.OP_DELETE_SCHEDULER_TASK: {
//todo improve
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index adde81fbfe9..8d5bd244df7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -41,9 +41,9 @@ import java.util.Objects;
public class AlterJobCommand extends AlterCommand implements ForwardWithSync {
// exclude job name prefix, which is used by inner job
private static final String excludeJobNamePrefix = "inner_";
- private String jobName;
- private Map<String, String> properties;
- private String sql;
+ private final String jobName;
+ private final Map<String, String> properties;
+ private final String sql;
public AlterJobCommand(String jobName, Map<String, String> properties,
String sql) {
super(PlanType.ALTER_JOB_COMMAND);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index 924f2bd4842..fecac5e135c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -283,8 +283,8 @@ public class CreateJobInfo {
throw new AnalysisException(e.getMessage());
}
} else {
- throw new AnalysisException("Not support this sql : " + sql + "
Command class is "
- + logicalPlan.getClass().getName() + ".");
+ throw new AnalysisException("Only " +
logicalPlan.getClass().getName()
+ + " is supported to use with streaming job together");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 6f7cec71857..9e5c54099b8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -79,7 +79,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -544,24 +543,8 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
}
}
- // todo: add ut
public List<UnboundTVFRelation> getAllTVFRelation() {
- List<UnboundTVFRelation> tvfs = new ArrayList<>();
- findAllTVFInPlan(getLogicalQuery(), tvfs);
- return tvfs;
- }
-
- private void findAllTVFInPlan(LogicalPlan plan, List<UnboundTVFRelation>
tvfs) {
- if (plan instanceof UnboundTVFRelation) {
- UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
- tvfs.add(tvfRelation);
- }
-
- for (Plan child : plan.children()) {
- if (child instanceof LogicalPlan) {
- findAllTVFInPlan((LogicalPlan) child, tvfs);
- }
- }
+ return
getLogicalQuery().collectToList(UnboundTVFRelation.class::isInstance);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
deleted file mode 100644
index 785531fa0d1..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.persist;
-
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
-import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.persist.gson.GsonUtils;
-
-import com.google.gson.annotations.SerializedName;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-public class AlterStreamingJobOperationLog implements Writable {
- @SerializedName(value = "jid")
- private long jobId;
- @SerializedName(value = "js")
- private JobStatus status;
- @SerializedName(value = "jp")
- private Map<String, String> jobProperties;
- @SerializedName(value = "sql")
- String executeSql;
-
- public AlterStreamingJobOperationLog(long jobId, JobStatus status,
- Map<String, String> jobProperties, String executeSql) {
- this.jobId = jobId;
- this.status = status;
- this.jobProperties = jobProperties;
- this.executeSql = executeSql;
- }
-
- public long getJobId() {
- return jobId;
- }
-
- public Map<String, String> getJobProperties() {
- return jobProperties;
- }
-
- public String getExecuteSql() {
- return executeSql;
- }
-
- public JobStatus getStatus() {
- return status;
- }
-
- public static AlterStreamingJobOperationLog read(DataInput in) throws
IOException {
- String json = Text.readString(in);
- return GsonUtils.GSON.fromJson(json,
AlterStreamingJobOperationLog.class);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- String json = GsonUtils.GSON.toJson(this);
- Text.writeString(out, json);
- }
-
- @Override
- public String toString() {
- return "AlterStreamingJobOperationLog{"
- + "jobId=" + jobId
- + ", status=" + status
- + ", jobProperties=" + jobProperties
- + ", executeSql='" + executeSql + '\''
- + '}';
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 74be0f5e475..5abcc4e0464 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -862,11 +862,6 @@ public class EditLog {
Env.getCurrentEnv().getJobManager().replayDeleteJob(job);
break;
}
- case OperationType.OP_UPDATE_STREAMING_JOB: {
- AlterStreamingJobOperationLog log =
(AlterStreamingJobOperationLog) journal.getData();
-
Env.getCurrentEnv().getJobManager().replayUpdateStreamingJob(log);
- break;
- }
/*case OperationType.OP_CREATE_SCHEDULER_TASK: {
JobTask task = (JobTask) journal.getData();
Env.getCurrentEnv().getJobTaskManager().replayCreateTask(task);
@@ -2043,10 +2038,6 @@ public class EditLog {
logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job);
}
- public void logUpdateStreamingJob(AlterStreamingJobOperationLog log) {
- logEdit(OperationType.OP_UPDATE_STREAMING_JOB, log);
- }
-
public void logDeleteJob(AbstractJob job) {
logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 12d59d086b9..2bbc03d4aea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -414,8 +414,6 @@ public class OperationType {
public static final short OP_OPERATE_KEY = 492;
- public static final short OP_UPDATE_STREAMING_JOB = 493;
-
// For cloud.
public static final short OP_UPDATE_CLOUD_REPLICA = 1000;
@Deprecated
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 6c8506af7d7..d5262d36a96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4761,6 +4761,7 @@ public class SessionVariable implements Serializable,
Writable {
field.set(this, root.get(attr.name()));
break;
case "double":
+ // root.get(attr.name()) always return Double type, so
need to convert it.
field.set(this,
Double.valueOf(root.get(attr.name()).toString()));
break;
case "String":
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index d89bc8d2f0f..54fc5faa2e8 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -46,7 +46,7 @@ suite("test_streaming_insert_job") {
sql """
CREATE JOB ${jobName}
PROPERTIES(
- "s3.batch_files" = "1"
+ "s3.max_batch_files" = "1"
)
ON STREAMING DO INSERT INTO ${tableName}
SELECT * FROM S3
@@ -104,9 +104,9 @@ suite("test_streaming_insert_job") {
// alter streaming job
sql """
- ALTER JOB FOR ${jobName}
+ ALTER JOB ${jobName}
PROPERTIES(
- "session.insert_max_filter_ratio" = "0.5"
+ "session.insert_max_filter_ratio" = 0.5
)
INSERT INTO ${tableName}
SELECT * FROM S3
@@ -126,7 +126,7 @@ suite("test_streaming_insert_job") {
select status,properties,ConsumedOffset from jobs("type"="insert")
where Name='${jobName}'
"""
assert alterJobProperties.get(0).get(0) == "PAUSED"
- assert alterJobProperties.get(0).get(1) ==
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+ assert alterJobProperties.get(0).get(1) ==
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
assert alterJobProperties.get(0).get(2) ==
"regression/load/data/example_1.csv"
sql """
@@ -136,7 +136,7 @@ suite("test_streaming_insert_job") {
select status,properties,ConsumedOffset from jobs("type"="insert")
where Name='${jobName}'
"""
assert resumeJobStatus.get(0).get(0) == "RUNNING" ||
resumeJobStatus.get(0).get(0) == "PENDING"
- assert resumeJobStatus.get(0).get(1) ==
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+ assert resumeJobStatus.get(0).get(1) ==
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
assert resumeJobStatus.get(0).get(2) ==
"regression/load/data/example_1.csv"
Awaitility.await().atMost(60, SECONDS)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]