This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 808c9cd6bec [Improve](spark-load)update spark version for spark load
to resolve cve problem (#30368) (#33085)
808c9cd6bec is described below
commit 808c9cd6bec83f3e8b21ce018adaa0626c897d60
Author: gnehil <[email protected]>
AuthorDate: Mon Apr 1 15:28:20 2024 +0800
[Improve](spark-load)update spark version for spark load to resolve cve
problem (#30368) (#33085)
(cherry picked from commit 06801d5518b20d1a7ff26a271962614ad5c78d8e)
---
fe/pom.xml | 2 +-
.../main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java | 11 +++--------
2 files changed, 4 insertions(+), 9 deletions(-)
diff --git a/fe/pom.xml b/fe/pom.xml
index 89a7af17364..796dbb3ff1f 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -291,7 +291,7 @@ under the License.
<javax.activation.version>1.2.0</javax.activation.version>
<jaxws-api.version>2.3.0</jaxws-api.version>
<RoaringBitmap.version>0.8.13</RoaringBitmap.version>
- <spark.version>2.4.6</spark.version>
+ <spark.version>3.4.1</spark.version>
<hive.version>3.1.3</hive.version>
<hive.common.version>2.3.9</hive.common.version>
<nimbusds.version>9.35</nimbusds.version>
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 33ca13cb0e4..a5b3e33ab7d 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -47,8 +47,6 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
@@ -190,7 +188,6 @@ public final class SparkDpp implements java.io.Serializable
{
// TODO(wb) should deal largeint as BigInteger instead of string when
using biginteger as key,
// data type may affect sorting logic
StructType dstSchema =
DppUtils.createDstTableSchema(indexMeta.columns, false, true);
- ExpressionEncoder encoder = RowEncoder.apply(dstSchema);
resultRDD.repartitionAndSortWithinPartitions(new
BucketPartitioner(bucketKeyMap), new BucketComparator())
.foreachPartition((VoidFunction<Iterator<Tuple2<List<Object>,
Object[]>>>) t -> {
@@ -254,15 +251,13 @@ public final class SparkDpp implements
java.io.Serializable {
conf.set("spark.sql.parquet.outputTimestampType",
"INT96");
ParquetWriteSupport.setSchema(dstSchema, conf);
ParquetWriteSupport parquetWriteSupport = new
ParquetWriteSupport();
- parquetWriter = new ParquetWriter<InternalRow>(new
Path(tmpPath), parquetWriteSupport,
+ parquetWriter = new ParquetWriter<>(new
Path(tmpPath), parquetWriteSupport,
CompressionCodecName.SNAPPY, 256 * 1024 *
1024, 16 * 1024, 1024 * 1024, true, false,
WriterVersion.PARQUET_1_0, conf);
- if (parquetWriter != null) {
- LOG.info("[HdfsOperate]>> initialize writer
succeed! path:" + tmpPath);
- }
+ LOG.info("[HdfsOperate]>> initialize writer
succeed! path:" + tmpPath);
lastBucketKey = curBucketKey;
}
- InternalRow internalRow =
encoder.toRow(rowWithoutBucketKey);
+ InternalRow internalRow =
InternalRow.apply(rowWithoutBucketKey.toSeq());
parquetWriter.write(internalRow);
}
if (parquetWriter != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]