This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 808c9cd6bec [Improve](spark-load)update spark version for spark load 
to resolve cve problem (#30368) (#33085)
808c9cd6bec is described below

commit 808c9cd6bec83f3e8b21ce018adaa0626c897d60
Author: gnehil <[email protected]>
AuthorDate: Mon Apr 1 15:28:20 2024 +0800

    [Improve](spark-load)update spark version for spark load to resolve cve 
problem (#30368) (#33085)
    
    (cherry picked from commit 06801d5518b20d1a7ff26a271962614ad5c78d8e)
---
 fe/pom.xml                                                    |  2 +-
 .../main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java  | 11 +++--------
 2 files changed, 4 insertions(+), 9 deletions(-)

diff --git a/fe/pom.xml b/fe/pom.xml
index 89a7af17364..796dbb3ff1f 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -291,7 +291,7 @@ under the License.
         <javax.activation.version>1.2.0</javax.activation.version>
         <jaxws-api.version>2.3.0</jaxws-api.version>
         <RoaringBitmap.version>0.8.13</RoaringBitmap.version>
-        <spark.version>2.4.6</spark.version>
+        <spark.version>3.4.1</spark.version>
         <hive.version>3.1.3</hive.version>
         <hive.common.version>2.3.9</hive.common.version>
         <nimbusds.version>9.35</nimbusds.version>
diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 33ca13cb0e4..a5b3e33ab7d 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -47,8 +47,6 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.encoders.RowEncoder;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
 import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
@@ -190,7 +188,6 @@ public final class SparkDpp implements java.io.Serializable 
{
         // TODO(wb) should deal largeint as BigInteger instead of string when 
using biginteger as key,
         // data type may affect sorting logic
         StructType dstSchema = 
DppUtils.createDstTableSchema(indexMeta.columns, false, true);
-        ExpressionEncoder encoder = RowEncoder.apply(dstSchema);
 
         resultRDD.repartitionAndSortWithinPartitions(new 
BucketPartitioner(bucketKeyMap), new BucketComparator())
                 .foreachPartition((VoidFunction<Iterator<Tuple2<List<Object>, 
Object[]>>>) t -> {
@@ -254,15 +251,13 @@ public final class SparkDpp implements 
java.io.Serializable {
                             conf.set("spark.sql.parquet.outputTimestampType", 
"INT96");
                             ParquetWriteSupport.setSchema(dstSchema, conf);
                             ParquetWriteSupport parquetWriteSupport = new 
ParquetWriteSupport();
-                            parquetWriter = new ParquetWriter<InternalRow>(new 
Path(tmpPath), parquetWriteSupport,
+                            parquetWriter = new ParquetWriter<>(new 
Path(tmpPath), parquetWriteSupport,
                                     CompressionCodecName.SNAPPY, 256 * 1024 * 
1024, 16 * 1024, 1024 * 1024, true, false,
                                     WriterVersion.PARQUET_1_0, conf);
-                            if (parquetWriter != null) {
-                                LOG.info("[HdfsOperate]>> initialize writer 
succeed! path:" + tmpPath);
-                            }
+                            LOG.info("[HdfsOperate]>> initialize writer 
succeed! path:" + tmpPath);
                             lastBucketKey = curBucketKey;
                         }
-                        InternalRow internalRow = 
encoder.toRow(rowWithoutBucketKey);
+                        InternalRow internalRow = 
InternalRow.apply(rowWithoutBucketKey.toSeq());
                         parquetWriter.write(internalRow);
                     }
                     if (parquetWriter != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to