M. Le Bihan created SPARK-27840:
-----------------------------------
Summary: Hadoop attempts to create a temporary folder in root
folder
Key: SPARK-27840
URL: https://issues.apache.org/jira/browse/SPARK-27840
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.4.3
Reporter: M. Le Bihan
I have a REST web-service that calls a Spring-boot service.
{code:java}
/**
* Exporter les comptes de résultats et activités par niveau NAF, pour une
série d'intercommunalités, dans un fichier CSV.
* @param anneeCOG Année du COG.
* @param anneeSIRENE Année des données SIRENE à prendre en considération,
pour l'extraction des données entreprise/établissement.
* @param anneeComptesResultats Année des données Comptes de résultats à
prendre en considération.
* @param niveauNAF Niveau NAF de regroupement.
* @param codesIntercommunalites Code EPCI / SIREN des intercommunalités.
* @return Fichier d'exportation des comptes de résultats et activités
majeures des communes par niveau NAF.
* @throws IntercommunaliteAbsenteDansCommuneSiegeException si
l'intercommunalité désirée n'existe pas.
* @throws TechniqueException si un incident survient.
* @throws IOException
*/
@RequestMapping("/activites/communes/exporterActivitesIntercommunalites")
@Produces("text/csv")
public String exporterCSV(@RequestParam(name="anneeCOG") int anneeCOG,
@RequestParam(name="anneeSIRENE") int anneeSIRENE,
@RequestParam(name="anneeComptesResultats") int anneeComptesResultats,
@RequestParam(name="niveauNAF") int niveauNAF,
@RequestParam(name="codesIntercommunalites") String[]
codesIntercommunalites) throws
IntercommunaliteAbsenteDansCommuneSiegeException, TechniqueException,
IOException {
SIRENCommune[] sirenIntercommunalites = new
SIRENCommune[codesIntercommunalites.length];
for(int index=0; index < codesIntercommunalites.length; index ++) {
sirenIntercommunalites[index] = new
SIRENCommune(codesIntercommunalites[index]);
}
File tempCSV = new File(this.environnement.getProperty("java.io.tmpdir")
+ MessageFormat.format("{0,number,#0}", System.currentTimeMillis()));
File sortieCSV =
this.impactActivitesCommunalesService.exporterCSV(tempCSV, anneeCOG,
anneeSIRENE, anneeComptesResultats, niveauNAF, sirenIntercommunalites);
StringBuilder contenuCSV = new StringBuilder();
try(Stream<String> stream = Files.lines(sortieCSV.toPath(),
StandardCharsets.UTF_8)) {
stream.forEach(s -> contenuCSV.append(s).append("\n"));
}
return contenuCSV.toString();
}{code}
The Spring service create a Dataset, and then a CSV file from it, and return
that CSV to the rest web-service (it will have only 40 - 50 lines).
{code:java}
/**
* Exporter les comptes de résultats et activités par niveau NAF, pour une
série d'intercommunalités, dans un fichier CSV.
* @param anneeCOG Année du COG.
* @param anneeSIRENE Année des données SIRENE à prendre en considération,
pour l'extraction des données entreprise/établissement.
* @param anneeComptesResultats Année des données Comptes de résultats à
prendre en considération.
* @param niveauNAF Niveau NAF de regroupement.
* @param codesIntercommunalites Code EPCI / SIREN des intercommunalités.
* @return Fichier d'exportation des comptes de résultats et activités
majeures des communes par niveau NAF.
* @throws IntercommunaliteAbsenteDansCommuneSiegeException si
l'intercommunalité désirée n'existe pas.
* @throws TechniqueException si un incident survient.
*/
public File exporterCSV(File sortieCSV, int anneeCOG, int anneeSIRENE, int
anneeComptesResultats, int niveauNAF, SIRENCommune... codesIntercommunalites)
throws IntercommunaliteAbsenteDansCommuneSiegeException, TechniqueException {
Objects.requireNonNull(sortieCSV, "Le fichier CSV de sortie ne peut pas
valoir null.");
JavaPairRDD<CodeCommune, Tuple2<ComptesResultats,
ActivitesCommunaleParNAF>> intercos =
rddActivitesEtComptesResultatsCommunes(anneeCOG, anneeSIRENE,
anneeComptesResultats, niveauNAF, codesIntercommunalites);
Dataset<Row> ds = toDataset(anneeCOG, intercos);
ds.coalesce(1).write().mode(SaveMode.Overwrite).option("header",
"true").option("quoteMode", "NON_NUMERIC").option("quote",
"\"").csv(sortieCSV.getAbsolutePath());
// Dresser la liste des fichiers d'extension .csv produits.
try {
List<File> fichiersCSV = Files.walk(sortieCSV.toPath())
// Rechercher dans le répertoire de sortie
.map(c -> c.toFile())
// les Path convertis en File,
.filter(c -> c.isDirectory() == false &&
c.getName().endsWith(".csv")) // qui sont des fichiers CSV
.collect(Collectors.toList());
// et les renvoyer en liste.
// S'il y en a un nombre différent d'un, nous sommes face à une
anomalie.
if (fichiersCSV.size() != 1) {
String message =
BabelTower.format(ImpactActivitesCommunalesService.class,
"anomalie.production_nombre_csv_intercos",
Arrays.asList(codesIntercommunalites),
sortieCSV.getAbsolutePath(), fichiersCSV.size());
PersistenceException ex = new PersistenceException(message);
LOGGER.error(ex.getMessage());
throw ex;
}
LOGGER.info("Activités et comptes de résultats ont étés exportés dans
le fichier CSV {}.", sortieCSV.getAbsolutePath());
return fichiersCSV.get(0);
}
catch(IOException e) {
String message =
BabelTower.format(ImpactActivitesCommunalesService.class,
"anomalie.production_csv_intercos", codesIntercommunalites,
sortieCSV.getAbsolutePath(), e.getMessage());
PersistenceException ex = new PersistenceException(message, e);
LOGGER.error(ex.getMessage(), e);
throw ex;
}
}
{code}
Everything is going ok during 15 minutes, all the steps are working well. But
when the driver is handling the CSV :
{code:java}
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
[spark-sql_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
[spark-sql_2.11-2.4.3.jar:2.4.3]
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
[spark-sql_2.11-2.4.3.jar:2.4.3]
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
[spark-sql_2.11-2.4.3.jar:2.4.3]
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
[spark-sql_2.11-2.4.3.jar:2.4.3]
at
fr.comptes.france.metier.application.spark.acquisition.comptes.resultats.ImpactActivitesCommunalesService.exporterCSV(ImpactActivitesCommunalesService.java:216)
[classes/:na]
at
fr.comptes.france.metier.application.ActivitesController.exporterCSV(ActivitesController.java:102)
[classes/:na]{code}
Hadoop fails on creating a temporary folder because it attempts to create it on
my root folder :
{code:java}
java.io.IOException: Mkdirs failed to create
file:/tmp1558800629493/_temporary/0/_temporary/attempt_20190525182751_0014_m_000000_339
(exists=false,
cwd=file:/home/marc/dev/Java/comptes-france/dev/metier-et-gestion/dev/ApplicationMetierEtGestion)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:447)
~[hadoop-common-2.6.5.jar:na]
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:433)
~[hadoop-common-2.6.5.jar:na]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
~[hadoop-common-2.6.5.jar:na]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
~[hadoop-common-2.6.5.jar:na]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
~[hadoop-common-2.6.5.jar:na]
at
org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
~[spark-sql_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
~[spark-sql_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
~[spark-sql_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
~[spark-sql_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
~[spark-sql_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
~[spark-sql_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
~[spark-sql_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
~[spark-sql_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
~[spark-sql_2.11-2.4.3.jar:2.4.3]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
~[spark-core_2.11-2.4.3.jar:2.4.3]
at org.apache.spark.scheduler.Task.run(Task.scala:121)
~[spark-core_2.11-2.4.3.jar:2.4.3]
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
~[spark-core_2.11-2.4.3.jar:2.4.3]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
~[spark-core_2.11-2.4.3.jar:2.4.3]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
~[spark-core_2.11-2.4.3.jar:2.4.3]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[na:1.8.0_212]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]{code}
without taking into account my parameters in SparkConf :
{code:java}
spark.master : local
java.io.tmpdir : /data/tmp
hadoop.tmp.dir : /data/tmp
spark.home : .
spark.driver.memory : 16073741824
spark.local.dir : /data/tmp
spark.testing.memory : 16073741824
spark.driver.maxResultSize : 15073741824
spark.app.name : Comptes france : metier et gestion
spark.executor.extraJavaOptions : -Djava.io.tmpdir=/data/tmp
spark.driver.extraJavaOptions : -Djava.io.tmpdir=/data/tmp{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]