[ 
https://issues.apache.org/jira/browse/SPARK-27840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Le Bihan resolved SPARK-27840.
---------------------------------
    Resolution: Not A Bug

Was mine own bug...

> Hadoop attempts to create a temporary folder in root folder
> -----------------------------------------------------------
>
>                 Key: SPARK-27840
>                 URL: https://issues.apache.org/jira/browse/SPARK-27840
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.3
>            Reporter: M. Le Bihan
>            Priority: Major
>
> I have a REST web-service that calls a Spring-boot service.
>  
> {code:java}
>    /**
>     * Exporter les comptes de résultats et activités par niveau NAF, pour une 
> série d'intercommunalités, dans un fichier CSV.
>     * @param anneeCOG Année du COG.
>     * @param anneeSIRENE Année des données SIRENE à prendre en considération, 
> pour l'extraction des données entreprise/établissement.
>     * @param anneeComptesResultats Année des données Comptes de résultats à 
> prendre en considération.
>     * @param niveauNAF Niveau NAF de regroupement.
>     * @param codesIntercommunalites Code EPCI / SIREN des intercommunalités.
>     * @return Fichier d'exportation des comptes de résultats et activités 
> majeures des communes par niveau NAF.
>     * @throws IntercommunaliteAbsenteDansCommuneSiegeException si 
> l'intercommunalité désirée n'existe pas. 
>     * @throws TechniqueException si un incident survient.
>     * @throws IOException 
>     */
>    @RequestMapping("/activites/communes/exporterActivitesIntercommunalites")
>    @Produces("text/csv")
>    public String exporterCSV(@RequestParam(name="anneeCOG") int anneeCOG, 
> @RequestParam(name="anneeSIRENE") int anneeSIRENE, 
>          @RequestParam(name="anneeComptesResultats") int 
> anneeComptesResultats, @RequestParam(name="niveauNAF") int niveauNAF, 
>          @RequestParam(name="codesIntercommunalites") String[] 
> codesIntercommunalites) throws 
> IntercommunaliteAbsenteDansCommuneSiegeException, TechniqueException, 
> IOException {
>       SIRENCommune[] sirenIntercommunalites = new 
> SIRENCommune[codesIntercommunalites.length];
>       
>       for(int index=0; index < codesIntercommunalites.length; index ++) {
>          sirenIntercommunalites[index] = new 
> SIRENCommune(codesIntercommunalites[index]);
>       }
>       
>       File tempCSV = new 
> File(this.environnement.getProperty("java.io.tmpdir") + 
> MessageFormat.format("{0,number,#0}", System.currentTimeMillis()));
>       File sortieCSV = 
> this.impactActivitesCommunalesService.exporterCSV(tempCSV, anneeCOG, 
> anneeSIRENE, anneeComptesResultats, niveauNAF, sirenIntercommunalites);
>       
>       StringBuilder contenuCSV = new StringBuilder();
>       
>       try(Stream<String> stream = Files.lines(sortieCSV.toPath(), 
> StandardCharsets.UTF_8)) {
>          stream.forEach(s -> contenuCSV.append(s).append("\n"));
>       }
>       return contenuCSV.toString();
>    }{code}
>  
> The Spring service create a Dataset, and then a CSV file from it, and return 
> that CSV to the rest web-service (it will have only 40 - 50 lines).
>  
> {code:java}
>    /**
>     * Exporter les comptes de résultats et activités par niveau NAF, pour une 
> série d'intercommunalités, dans un fichier CSV.
>     * @param anneeCOG Année du COG.
>     * @param anneeSIRENE Année des données SIRENE à prendre en considération, 
> pour l'extraction des données entreprise/établissement.
>     * @param anneeComptesResultats Année des données Comptes de résultats à 
> prendre en considération.
>     * @param niveauNAF Niveau NAF de regroupement.
>     * @param codesIntercommunalites Code EPCI / SIREN des intercommunalités.
>     * @return Fichier d'exportation des comptes de résultats et activités 
> majeures des communes par niveau NAF.
>     * @throws IntercommunaliteAbsenteDansCommuneSiegeException si 
> l'intercommunalité désirée n'existe pas. 
>     * @throws TechniqueException si un incident survient.
>     */
>    public File exporterCSV(File sortieCSV, int anneeCOG, int anneeSIRENE, int 
> anneeComptesResultats, int niveauNAF, SIRENCommune... codesIntercommunalites) 
> throws IntercommunaliteAbsenteDansCommuneSiegeException, TechniqueException {
>       Objects.requireNonNull(sortieCSV, "Le fichier CSV de sortie ne peut pas 
> valoir null.");
>       
>       JavaPairRDD<CodeCommune, Tuple2<ComptesResultats, 
> ActivitesCommunaleParNAF>> intercos = 
> rddActivitesEtComptesResultatsCommunes(anneeCOG, anneeSIRENE, 
> anneeComptesResultats, niveauNAF, codesIntercommunalites);
>       Dataset<Row> ds = toDataset(anneeCOG, intercos);
>       ds.coalesce(1).write().mode(SaveMode.Overwrite).option("header", 
> "true").option("quoteMode", "NON_NUMERIC").option("quote", 
> "\"").csv(sortieCSV.getAbsolutePath());
>       
>       // Dresser la liste des fichiers d'extension .csv produits.
>       try {
>          List<File> fichiersCSV = Files.walk(sortieCSV.toPath())              
>      // Rechercher dans le répertoire de sortie
>             .map(c -> c.toFile())                                             
>      // les Path convertis en File,
>             .filter(c -> c.isDirectory() == false && 
> c.getName().endsWith(".csv")) // qui sont des fichiers CSV
>             .collect(Collectors.toList());                                    
>      // et les renvoyer en liste.
>          
>          // S'il y en a un nombre différent d'un, nous sommes face à une 
> anomalie.
>          if (fichiersCSV.size() != 1) {
>             String message = 
> BabelTower.format(ImpactActivitesCommunalesService.class, 
> "anomalie.production_nombre_csv_intercos", 
>                Arrays.asList(codesIntercommunalites), 
> sortieCSV.getAbsolutePath(), fichiersCSV.size());
>             
>             PersistenceException ex = new PersistenceException(message);
>             LOGGER.error(ex.getMessage());
>             throw ex;
>          }
>          
>          LOGGER.info("Activités et comptes de résultats ont étés exportés 
> dans le fichier CSV {}.", sortieCSV.getAbsolutePath());
>          return fichiersCSV.get(0);
>       }
>       catch(IOException e) {
>          String message = 
> BabelTower.format(ImpactActivitesCommunalesService.class, 
> "anomalie.production_csv_intercos", codesIntercommunalites, 
> sortieCSV.getAbsolutePath(), e.getMessage());
>          
>          PersistenceException ex = new PersistenceException(message, e);
>          LOGGER.error(ex.getMessage(), e);
>          throw ex;
>       }
>    } 
> {code}
>  
> Everything is going ok during 15 minutes, all the steps are working well. But 
> when the driver is handling the CSV :
> {code:java}
>     at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) 
> [spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>  [spark-sql_2.11-2.4.3.jar:2.4.3]
>     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) 
> [spark-sql_2.11-2.4.3.jar:2.4.3]
>     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) 
> [spark-sql_2.11-2.4.3.jar:2.4.3]
>     at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664) 
> [spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> fr.comptes.france.metier.application.spark.acquisition.comptes.resultats.ImpactActivitesCommunalesService.exporterCSV(ImpactActivitesCommunalesService.java:216)
>  [classes/:na]
>     at 
> fr.comptes.france.metier.application.ActivitesController.exporterCSV(ActivitesController.java:102)
>  [classes/:na]{code}
> Hadoop fails on creating a temporary folder because it attempts to create it 
> on my root folder :
> {code:java}
> java.io.IOException: Mkdirs failed to create 
> file:/tmp1558800629493/_temporary/0/_temporary/attempt_20190525182751_0014_m_000000_339
>  (exists=false, 
> cwd=file:/home/marc/dev/Java/comptes-france/dev/metier-et-gestion/dev/ApplicationMetierEtGestion)
>     at 
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:447) 
> ~[hadoop-common-2.6.5.jar:na]
>     at 
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:433) 
> ~[hadoop-common-2.6.5.jar:na]
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) 
> ~[hadoop-common-2.6.5.jar:na]
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) 
> ~[hadoop-common-2.6.5.jar:na]
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) 
> ~[hadoop-common-2.6.5.jar:na]
>     at 
> org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
>  ~[spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
>  ~[spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
>  ~[spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
>  ~[spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
>  ~[spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
>  ~[spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
>  ~[spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>  ~[spark-sql_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  ~[spark-sql_2.11-2.4.3.jar:2.4.3]
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
> ~[spark-core_2.11-2.4.3.jar:2.4.3]
>     at org.apache.spark.scheduler.Task.run(Task.scala:121) 
> ~[spark-core_2.11-2.4.3.jar:2.4.3]
>     at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>  ~[spark-core_2.11-2.4.3.jar:2.4.3]
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
> ~[spark-core_2.11-2.4.3.jar:2.4.3]
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 
> ~[spark-core_2.11-2.4.3.jar:2.4.3]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[na:1.8.0_212]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[na:1.8.0_212]
>     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]{code}
> without taking into account my parameters in SparkConf :
> {code:java}
> spark.master : local
> java.io.tmpdir : /data/tmp
> hadoop.tmp.dir : /data/tmp
> spark.home : .
> spark.driver.memory : 16073741824
> spark.local.dir : /data/tmp
> spark.testing.memory : 16073741824
> spark.driver.maxResultSize : 15073741824
> spark.app.name : Comptes france : metier et gestion
> spark.executor.extraJavaOptions : -Djava.io.tmpdir=/data/tmp
> spark.driver.extraJavaOptions : -Djava.io.tmpdir=/data/tmp{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to