This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c9d6b2e9b5b [feature](merge-cloud) Add cloud copy load job meta
definition (#32182)
c9d6b2e9b5b is described below
commit c9d6b2e9b5b239a7b4e6e0c09782db9b95ad530a
Author: walter <[email protected]>
AuthorDate: Wed Mar 13 23:29:10 2024 +0800
[feature](merge-cloud) Add cloud copy load job meta definition (#32182)
Co-authored-by: meiyi <[email protected]>
Co-authored-by: Lei Zhang <[email protected]>
Co-authored-by: Gavin Chou <[email protected]>
Co-authored-by: Luwei <[email protected]>
---
.../org/apache/doris/cloud/load/CloudCopyJob.java | 106 +++++++++++++++++++++
.../java/org/apache/doris/load/EtlJobType.java | 1 +
.../apache/doris/load/loadv2/BrokerLoadJob.java | 4 +
.../java/org/apache/doris/load/loadv2/LoadJob.java | 3 +
4 files changed, 114 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
new file mode 100644
index 00000000000..c3302901072
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.load;
+
+import org.apache.doris.cloud.proto.Cloud.StagePB;
+import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.load.loadv2.JobState;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import lombok.Getter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CloudCopyJob extends BrokerLoadJob {
+ private static final Logger LOG = LogManager.getLogger(CloudCopyJob.class);
+ private static final String TABLE_NAME_KEY = "TableName";
+ private static final String USER_NAME_KEY = "UserName";
+
+ @Getter
+ private String stageId;
+ @Getter
+ private StagePB.StageType stageType;
+ @Getter
+ private String stagePrefix;
+ @Getter
+ private long sizeLimit;
+ @Getter
+ private String pattern;
+ @Getter
+ private ObjectInfo objectInfo;
+ @Getter
+ private String copyId;
+ @Getter
+ private boolean forceCopy;
+ private String loadFilePaths = "";
+ private Map<String, String> properties = new HashMap<>();
+ private volatile boolean abortedCopy = false;
+ private boolean isReplay = false;
+ private List<String> loadFiles = null;
+
+ public CloudCopyJob() {
+ super(EtlJobType.COPY);
+ }
+
+ @Override
+ public void cancelJob(FailMsg failMsg) throws DdlException {
+ super.cancelJob(failMsg);
+ loadFiles = null;
+ abortedCopy = true;
+ }
+
+ public void setAbortedCopy(boolean abortedCopy) {
+ this.abortedCopy = abortedCopy;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ Text.writeString(out, copyId);
+ Text.writeString(out, loadFilePaths);
+ Gson gson = new Gson();
+ Text.writeString(out, properties == null ? "" :
gson.toJson(properties));
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ copyId = Text.readString(in);
+ loadFilePaths = Text.readString(in);
+ String property = Text.readString(in);
+ properties = property.isEmpty() ? new HashMap<>()
+ : (new Gson().fromJson(property, new TypeToken<Map<String,
String>>() {
+ }.getType()));
+
+ // FIXME: COPY JOB is not supported yet.
+ state = JobState.CANCELLED;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java
b/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java
index 2770738d3d7..95333d0f025 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java
@@ -24,6 +24,7 @@ public enum EtlJobType {
BROKER,
DELETE,
SPARK,
+ COPY,
LOCAL_FILE,
// create by job scheduler,inner use
INSERT_JOB,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 1f24484be5f..abf0500b90e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -90,6 +90,10 @@ public class BrokerLoadJob extends BulkLoadJob {
super(EtlJobType.BROKER);
}
+ protected BrokerLoadJob(EtlJobType type) {
+ super(type);
+ }
+
public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc,
OriginStatement originStmt, UserIdentity userInfo)
throws MetaNotFoundException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index c42295a1322..c15904be8ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.cloud.load.CloudCopyJob;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -766,6 +767,8 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
job = new InsertLoadJob();
} else if (type == EtlJobType.MINI) {
job = new MiniLoadJob();
+ } else if (type == EtlJobType.COPY) {
+ job = new CloudCopyJob();
} else {
throw new IOException("Unknown load type: " + type.name());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]