This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c9d6b2e9b5b [feature](merge-cloud) Add cloud copy load job meta 
definition (#32182)
c9d6b2e9b5b is described below

commit c9d6b2e9b5b239a7b4e6e0c09782db9b95ad530a
Author: walter <[email protected]>
AuthorDate: Wed Mar 13 23:29:10 2024 +0800

    [feature](merge-cloud) Add cloud copy load job meta definition (#32182)
    
    Co-authored-by: meiyi <[email protected]>
    Co-authored-by: Lei Zhang <[email protected]>
    Co-authored-by: Gavin Chou <[email protected]>
    Co-authored-by: Luwei <[email protected]>
---
 .../org/apache/doris/cloud/load/CloudCopyJob.java  | 106 +++++++++++++++++++++
 .../java/org/apache/doris/load/EtlJobType.java     |   1 +
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   4 +
 .../java/org/apache/doris/load/loadv2/LoadJob.java |   3 +
 4 files changed, 114 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
new file mode 100644
index 00000000000..c3302901072
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.load;
+
+import org.apache.doris.cloud.proto.Cloud.StagePB;
+import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.load.loadv2.JobState;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import lombok.Getter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CloudCopyJob extends BrokerLoadJob {
+    private static final Logger LOG = LogManager.getLogger(CloudCopyJob.class);
+    private static final String TABLE_NAME_KEY = "TableName";
+    private static final String USER_NAME_KEY = "UserName";
+
+    @Getter
+    private String stageId;
+    @Getter
+    private StagePB.StageType stageType;
+    @Getter
+    private String stagePrefix;
+    @Getter
+    private long sizeLimit;
+    @Getter
+    private String pattern;
+    @Getter
+    private ObjectInfo objectInfo;
+    @Getter
+    private String copyId;
+    @Getter
+    private boolean forceCopy;
+    private String loadFilePaths = "";
+    private Map<String, String> properties = new HashMap<>();
+    private volatile boolean abortedCopy = false;
+    private boolean isReplay = false;
+    private List<String> loadFiles = null;
+
+    public CloudCopyJob() {
+        super(EtlJobType.COPY);
+    }
+
+    @Override
+    public void cancelJob(FailMsg failMsg) throws DdlException {
+        super.cancelJob(failMsg);
+        loadFiles = null;
+        abortedCopy = true;
+    }
+
+    public void setAbortedCopy(boolean abortedCopy) {
+        this.abortedCopy = abortedCopy;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        Text.writeString(out, copyId);
+        Text.writeString(out, loadFilePaths);
+        Gson gson = new Gson();
+        Text.writeString(out, properties == null ? "" : 
gson.toJson(properties));
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        copyId = Text.readString(in);
+        loadFilePaths = Text.readString(in);
+        String property = Text.readString(in);
+        properties = property.isEmpty() ? new HashMap<>()
+                : (new Gson().fromJson(property, new TypeToken<Map<String, 
String>>() {
+                }.getType()));
+
+        // FIXME: COPY JOB is not supported yet.
+        state = JobState.CANCELLED;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java
index 2770738d3d7..95333d0f025 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java
@@ -24,6 +24,7 @@ public enum EtlJobType {
     BROKER,
     DELETE,
     SPARK,
+    COPY,
     LOCAL_FILE,
     // create by job scheduler,inner use
     INSERT_JOB,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 1f24484be5f..abf0500b90e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -90,6 +90,10 @@ public class BrokerLoadJob extends BulkLoadJob {
         super(EtlJobType.BROKER);
     }
 
+    protected BrokerLoadJob(EtlJobType type) {
+        super(type);
+    }
+
     public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc,
                          OriginStatement originStmt, UserIdentity userInfo)
             throws MetaNotFoundException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index c42295a1322..c15904be8ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.AuthorizationInfo;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.cloud.load.CloudCopyJob;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -766,6 +767,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             job = new InsertLoadJob();
         } else if (type == EtlJobType.MINI) {
             job = new MiniLoadJob();
+        } else if (type == EtlJobType.COPY) {
+            job = new CloudCopyJob();
         } else {
             throw new IOException("Unknown load type: " + type.name());
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to