This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit dc6517c4a169224f81a82d1ff713df1c00a6203b
Author: HonestManXin <[email protected]>
AuthorDate: Thu Jul 20 14:36:00 2023 +0800

     [fix](sparkdpp) Hive table properties not take effect when create spark 
session (#21881)
    
    When creating a Hive external table for Spark loading, the Hive external 
table includes related information such as the Hive Metastore. However, when 
submitting a job, it is required to have the hive-site.xml file in the Spark 
conf directory; otherwise, the Spark job may fail with an error message 
indicating that the corresponding Hive table cannot be found.
    
    The SparkEtlJob.initSparkConfigs method sets the properties of the external 
table into the Spark conf. However, at this point, the Spark session has 
already been created, and the Hive-related parameters will not take effect. To 
ensure that the Spark Hive catalog properly loads Hive tables, you need to set 
the Hive-related parameters before creating the Spark session.
    
    Co-authored-by: zhangshixin <[email protected]>
---
 .../apache/doris/load/loadv2/etl/SparkEtlJob.java  | 29 +++++++---
 .../doris/load/loadv2/etl/SparkEtlJobTest.java     | 63 ++++++++++++++++++----
 2 files changed, 73 insertions(+), 19 deletions(-)

diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
index 34f4c0a8c3..484ff4eca2 100644
--- 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
+++ 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
@@ -30,14 +30,21 @@ import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.io.CharStreams;
 import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkConf;
-import org.apache.spark.sql.Dataset;
+import org.apache.spark.deploy.SparkHadoopUtil;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.functions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -62,6 +69,7 @@ public class SparkEtlJob {
     private Set<Long> hiveSourceTables;
     private Map<Long, Set<String>> tableToBitmapDictColumns;
     private Map<Long, Set<String>> tableToBinaryBitmapColumns;
+    private final SparkConf conf;
     private SparkSession spark;
 
     private SparkEtlJob(String jobConfigFilePath) {
@@ -70,10 +78,10 @@ public class SparkEtlJob {
         this.hiveSourceTables = Sets.newHashSet();
         this.tableToBitmapDictColumns = Maps.newHashMap();
         this.tableToBinaryBitmapColumns = Maps.newHashMap();
+        conf = new SparkConf();
     }
 
-    private void initSparkEnvironment() {
-        SparkConf conf = new SparkConf();
+    private void initSpark() {
         //serialization conf
         conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
         conf.set("spark.kryo.registrator", 
"org.apache.doris.load.loadv2.dpp.DorisKryoRegistrator");
@@ -86,14 +94,19 @@ public class SparkEtlJob {
             return;
         }
         for (Map.Entry<String, String> entry : configs.entrySet()) {
-            spark.sparkContext().conf().set(entry.getKey(), entry.getValue());
+            conf.set(entry.getKey(), entry.getValue());
+            conf.set("spark.hadoop." + entry.getKey(), entry.getValue());
         }
     }
 
-    private void initConfig() {
+    private void initConfig() throws IOException {
         LOG.debug("job config file path: " + jobConfigFilePath);
-        Dataset<String> ds = spark.read().textFile(jobConfigFilePath);
-        String jsonConfig = ds.first();
+        Configuration hadoopConf = 
SparkHadoopUtil.get().newConfiguration(this.conf);
+        String jsonConfig;
+        Path path = new Path(jobConfigFilePath);
+        try (FileSystem fs = path.getFileSystem(hadoopConf); DataInputStream 
in = fs.open(path)) {
+            jsonConfig = CharStreams.toString(new InputStreamReader(in));
+        }
         LOG.debug("rdd read json config: " + jsonConfig);
         etlJobConfig = EtlJobConfig.configFromJson(jsonConfig);
         LOG.debug("etl job config: " + etlJobConfig);
@@ -241,12 +254,12 @@ public class SparkEtlJob {
             }
         }
 
+        initSpark();
         // data partition sort and aggregation
         processDpp();
     }
 
     private void run() throws Exception {
-        initSparkEnvironment();
         initConfig();
         checkConfig();
         processData();
diff --git 
a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java
 
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java
index 8b94937805..0ea7f66092 100644
--- 
a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java
+++ 
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java
@@ -31,14 +31,19 @@ import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import mockit.Expectations;
-import mockit.Injectable;
 import mockit.Mocked;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.SparkSession;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -92,20 +97,15 @@ public class SparkEtlJobTest {
     }
 
     @Test
-    public void testInitConfig(@Mocked SparkSession spark, @Injectable 
Dataset<String> ds) {
+    public void testInitConfig(@Mocked FileSystem fs) throws IOException {
         new Expectations() {
             {
-                SparkSession.builder().enableHiveSupport().getOrCreate();
-                result = spark;
-                spark.read().textFile(anyString);
-                result = ds;
-                ds.first();
-                result = etlJobConfig.configToJson();
+                fs.open(new Path("hdfs://127.0.0.1:10000/jobconfig.json"));
+                result = new FSDataInputStream(new 
SeekableByteArrayInputStream(etlJobConfig.configToJson().getBytes()));
             }
         };
 
         SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, 
"hdfs://127.0.0.1:10000/jobconfig.json");
-        Deencapsulation.invoke(job, "initSparkEnvironment");
         Deencapsulation.invoke(job, "initConfig");
         EtlJobConfig parsedConfig = Deencapsulation.getField(job, 
"etlJobConfig");
         Assert.assertTrue(parsedConfig.tables.containsKey(tableId));
@@ -150,4 +150,45 @@ public class SparkEtlJobTest {
         // check remove v2 bitmap_dict func mapping from file group column 
mappings
         
Assert.assertFalse(table.fileGroups.get(0).columnMappings.containsKey("v2"));
     }
+
+    private static class SeekableByteArrayInputStream extends 
ByteArrayInputStream implements Seekable, PositionedReadable {
+        public SeekableByteArrayInputStream(byte[] buf) {
+            super(buf);
+        }
+
+        public void seek(long position) {
+            if (position < 0 || position >= buf.length) {
+                throw new IllegalArgumentException("pos = " + position + " 
length = " + buf.length);
+            }
+            this.pos = (int) position;
+        }
+
+        public long getPos() {
+            return this.pos;
+        }
+
+        @Override
+        public boolean seekToNewSource(long targetPos) throws IOException {
+            return false;
+        }
+
+        @Override
+        public int read(long position, byte[] buffer, int offset, int length) 
throws IOException {
+            this.seek(position);
+            return this.read(buffer, offset, length);
+        }
+
+        @Override
+        public void readFully(long position, byte[] buffer, int offset, int 
length) throws IOException {
+            if (position + length > buf.length) {
+                throw  new EOFException("End of file reached before reading 
fully.");
+            }
+            System.arraycopy(buf, (int) position, buffer, offset, length);
+        }
+
+        @Override
+        public void readFully(long position, byte[] buffer) throws IOException 
{
+            readFully(position, buffer, 0, buffer.length);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to