This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 799077eec55f24277ba9819f0aaa09f58d9783f5
Author: Lightman <[email protected]>
AuthorDate: Mon Apr 22 19:33:59 2024 +0800

    (cloud-merge) Fix the cast failed when RollupJobV2 convert to 
CloudRollupJobV2 (#33847)
---
 .../org/apache/doris/alter/AlterJobV2Factory.java  |  3 +-
 .../org/apache/doris/alter/CloudRollupJobV2.java   | 32 +++++++++++--------
 .../apache/doris/alter/CloudSchemaChangeJobV2.java | 36 ++++++++++++----------
 3 files changed, 40 insertions(+), 31 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2Factory.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2Factory.java
index d5c937fd181..ef3c41d0aa1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2Factory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2Factory.java
@@ -26,7 +26,6 @@ import org.apache.doris.qe.OriginStatement;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
 import java.util.List;
 
 public class AlterJobV2Factory {
@@ -69,7 +68,7 @@ public class AlterJobV2Factory {
                 }
             }
             return job;
-        } catch (IOException e) {
+        } catch (IllegalAccessException e) {
             throw new AnalysisException(e.getMessage());
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 4bc34582fff..51c4abde769 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -41,29 +41,35 @@ import org.apache.doris.thrift.TTabletType;
 import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 public class CloudRollupJobV2 extends RollupJobV2 {
     private static final Logger LOG = 
LogManager.getLogger(CloudRollupJobV2.class);
 
-    public static AlterJobV2 buildCloudRollupJobV2(RollupJobV2 job) throws 
IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        job.write(dos);
-        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
-        DataInputStream dis = new DataInputStream(bais);
-        CloudRollupJobV2 ret = (CloudRollupJobV2) CloudRollupJobV2.read(dis);
-        ret.partitionIdToRollupIndex = job.partitionIdToRollupIndex;
+    public static AlterJobV2 buildCloudRollupJobV2(RollupJobV2 job) throws 
IllegalAccessException {
+        CloudRollupJobV2 ret = new CloudRollupJobV2();
+        List<Field> allFields = new ArrayList<>();
+        Class tmpClass = RollupJobV2.class;
+        while (tmpClass != null) {
+            allFields.addAll(Arrays.asList(tmpClass.getDeclaredFields()));
+            tmpClass = tmpClass.getSuperclass();
+        }
+        for (Field field : allFields) {
+            field.setAccessible(true);
+            Annotation annotation = field.getAnnotation(SerializedName.class);
+            if (annotation != null) {
+                field.set(ret, field.get(job));
+            }
+        }
         return ret;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index cdfcf0a5e20..22ef8c0d619 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -39,14 +39,14 @@ import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -54,17 +54,21 @@ import java.util.stream.Collectors;
 public class CloudSchemaChangeJobV2 extends SchemaChangeJobV2 {
     private static final Logger LOG = 
LogManager.getLogger(SchemaChangeJobV2.class);
 
-    public static AlterJobV2 buildCloudSchemaChangeJobV2(SchemaChangeJobV2 
job) throws IOException {
-        // deep copy to save repeated assignments
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        job.write(dos);
-        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
-        DataInputStream dis = new DataInputStream(bais);
-        // partitionIndexMap cannot be deep-copied because it is referenced
-        // by `SchemaChangeJobV2#addShadowIndexToCatalog` and 
`SchemaChangeHandler.createJob`
-        CloudSchemaChangeJobV2 ret = (CloudSchemaChangeJobV2) 
CloudSchemaChangeJobV2.read(dis);
-        ret.partitionIndexMap = job.partitionIndexMap;
+    public static AlterJobV2 buildCloudSchemaChangeJobV2(SchemaChangeJobV2 
job) throws IllegalAccessException {
+        CloudSchemaChangeJobV2 ret = new CloudSchemaChangeJobV2();
+        List<Field> allFields = new ArrayList<>();
+        Class tmpClass = SchemaChangeJobV2.class;
+        while (tmpClass != null) {
+            allFields.addAll(Arrays.asList(tmpClass.getDeclaredFields()));
+            tmpClass = tmpClass.getSuperclass();
+        }
+        for (Field field : allFields) {
+            field.setAccessible(true);
+            Annotation annotation = field.getAnnotation(SerializedName.class);
+            if (annotation != null) {
+                field.set(ret, field.get(job));
+            }
+        }
         return ret;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to