[ 
https://issues.apache.org/jira/browse/SPARK-47156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marc Le Bihan resolved SPARK-47156.
-----------------------------------
    Resolution: Not A Bug

I was lacking Spark knowledge, and learned that executor don't have context to 
give to anyone, at runtime.

> SparkSession returns a null context during a dataset creation
> -------------------------------------------------------------
>
>                 Key: SPARK-47156
>                 URL: https://issues.apache.org/jira/browse/SPARK-47156
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.4.2
>         Environment: Debian 12
> Java 17
>            Reporter: Marc Le Bihan
>            Priority: Major
>
> I need first to know if I'm in front of a bug or not.
> If it's the case, I'll manage to create a test to help you reproduce the 
> case, but if it isn't, maybe Spark documentation could explain when 
> {{sparkSession.getContext()}} can return {{{}null{}}}.
>  
> I'm willing to ease my development by separating :
>  * parquet files management \{ checking existence, then loading them as 
> cache, or saving data to them },
>  * from dataset creation, when it doesn't exist yet, and should be 
> constituted from scratch.
>  
> The method I'm using is this one:
> {code:java}
> {code:Java}
> protected Dataset<Row> constitutionStandard(OptionsCreationLecture 
> optionsCreationLecture,
>    Supplier<Dataset<Row>> worker, CacheParqueteur<? extends TriParqueteurIF> 
> cacheParqueteur) {
>    OptionsCreationLecture options = optionsCreationLecture != null ? 
> optionsCreationLecture : optionsCreationLecture();
>    Dataset<Row> dataset = cacheParqueteur.call(options.useCache());
>    return dataset == null ? 
> cacheParqueteur.save(cacheParqueteur.appliquer(worker.get())) : dataset;
> }
> {code}
> In case the dataset doesn't exist in parquet files (= cache) yet, it starts 
> its creation by calling a {{worker.get()}} that is a {{Supplier}} of 
> {{{}Dataset<Row>{}}}.
>  
> A concrete usage is this one:
> {code:java}
> {code:Java}
> public Dataset<Row> rowEtablissements(OptionsCreationLecture 
> optionsCreationLecture, HistoriqueExecution historiqueExecution, int 
> anneeCOG, int anneeSIRENE, boolean actifsSeulement, boolean communesValides, 
> boolean nomenclaturesNAF2Valides) {
>    OptionsCreationLecture options = optionsCreationLecture != null ? 
> optionsCreationLecture : optionsCreationLecture();
>    Supplier<Dataset<Row>> worker = () -> {
>       super.setStageDescription(this.messageSource, 
> "row.etablissements.libelle.long", "row.etablissements.libelle.court", 
> anneeSIRENE, anneeCOG, actifsSeulement, communesValides, 
> nomenclaturesNAF2Valides);
>       
>       Map<String, Integer> indexs = new HashMap<>();
>       Dataset<Row> etablissements = 
> etablissementsNonFiltres(optionsCreationLecture, anneeSIRENE);
>       etablissements = etablissements.filter(
>          (FilterFunction<Row>)etablissement -> 
> this.validator.validationEtablissement(this.session, historiqueExecution, 
> etablissement, actifsSeulement, nomenclaturesNAF2Valides, indexs));
>       // Si le filtrage par communes valides a été demandé, l'appliquer.
>       if (communesValides) {
>          etablissements = rowRestreindreAuxCommunesValides(etablissements, 
> anneeCOG, anneeSIRENE, indexs);
>       }
>       else {
>          etablissements = etablissements.withColumn("codeDepartement", 
> substring(CODE_COMMUNE.col(), 1, 2));
>       }
>       // Associer les libellés des codes APE/NAF.
>       Dataset<Row> nomenclatureNAF = 
> this.nafDataset.rowNomenclatureNAF(anneeSIRENE);
>       etablissements = etablissements.join(nomenclatureNAF, 
> etablissements.col("activitePrincipale").equalTo(nomenclatureNAF.col("codeNAF"))
>  , "left_outer")
>          .drop("codeNAF", "niveauNAF");
>       // Le dataset est maintenant considéré comme valide, et ses champs 
> peuvent être castés dans leurs types définitifs.
>       return this.validator.cast(etablissements);
>    };
>    return constitutionStandard(options, () -> worker.get()
>       .withColumn("partitionSiren", SIREN_ENTREPRISE.col().substr(1,2)),
>       new CacheParqueteur<>(options, this.session,
>          "etablissements", 
> "annee_{0,number,#0}-actifs_{1}-communes_verifiees_{2}-nafs_verifies_{3}", 
> DEPARTEMENT_SIREN_SIRET,
>          anneeSIRENE, anneeCOG, actifsSeulement, communesValides));
> } {code}
>  
> In the worker, a filter calls a {{validationEtablissement(SparkSession, 
> HistoriqueExecution, Row, ...)}} on each row to perform complete checking 
> (eight rules to check for an establishment validity).
> When a check fails, along with a warning log, I'm also counting in 
> {{historiqueExecution}} object the number of problems of that kind I've 
> encountered.
> That function increase a {{longAccumulator}} value, and create that 
> accumulator first, that it stores in a {{{}Map<String, LongAccumulator> 
> accumulators{}}},  if needed.
> {code:java}
> {code:Java}
> public void incrementerOccurrences(SparkSession session, String 
> codeOuFormatMessage, boolean creerSiAbsent) {
>    LongAccumulator accumulator = accumulators.get(codeOuFormatMessage);
>    if (accumulator == null && creerSiAbsent) {
>       accumulator = 
> session.sparkContext().longAccumulator(codeOuFormatMessage);
>       accumulators.put(codeOuFormatMessage, accumulator);
>    }
>    if (accumulator != null) {
>       accumulator.add(1);
>    }
> }
> {code}
>  
> Or at least, it should. But my problem is that it isn't the case.
> During Dataset constitution :
> *1)* If I initialize the {{historiqueExecution}} variable with the exhaustive 
> list of messages it can have to count, +*before*+ the {{worker.get()}} is 
> called by the {{constitutionStandard}} method, the dataset is perfectly 
> constituted and I can dump my counters :
> {code:java}
> {code:Java}
> historiqueExecution.setCodesMessages(this.session,
>    "etablissement sans SIRET, peut être étranger : '{}'",
>    "etablissement au SIRET '{}' invalide, écarté.",
>    "etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son 
> SIRET vaut : {}), écarté.",
>    "etablissement au SIREN d'entreprise '{}' invalide, écarté.",
>    "établissement de SIRET {} écarté : il n'a pas de nomemclature APE.",
>    "établissement de SIRET {} écarté : sa nomemclature {} n'est plus 
> soutenue.",
>    "établissement de SIRET {} écarté : il n'a pas de code APE.",
>    "établissement de SIRET {} écarté : son code APE {} est invalide.",
>    "établissement de SIRET {} écarté : son type de voie d'adresse principale, 
> {}, est invalide",
>    "établissement de SIRET {} écarté : son type de voie d'adresse secondaire, 
> {}, est invalide",
>    "établissement de SIRET {} écarté : sa date de création, {}, est invalide",
>    "établissement de SIRET {} écarté : sa date de dernier traitement, {}, est 
> invalide : {}",
>    "établissement de SIRET {} écarté : sa date d'historisation, {}, est 
> invalide");// code placeholder
> {code}
> {noformat}
> etablissement au SIRET '{}' invalide, écarté. : 0
> établissement de SIRET {} écarté : sa nomemclature {} n'est plus soutenue. : 0
> etablissement sans SIREN d'entreprise, peut être étranger : '{}' (son SIRET 
> vaut : {}), écarté. : 0
> etablissement sans SIRET, peut être étranger : '{}' : 0
> établissement de SIRET {} écarté : son code APE {} est invalide. : 0
> établissement de SIRET {} écarté : il n'a pas de code APE. : 0
> établissement de SIRET {} écarté : sa date d'historisation, {}, est invalide 
> : 0
> établissement de SIRET {} écarté : son type de voie d'adresse principale, {}, 
> est invalide : 2
> établissement de SIRET {} écarté : sa date de dernier traitement, {}, est 
> invalide : {} : 0
> etablissement au SIREN d'entreprise '{}' invalide, écarté. : 0
> établissement de SIRET {} écarté : il n'a pas de nomemclature APE. : 0
> établissement de SIRET {} écarté : son type de voie d'adresse secondaire, {}, 
> est invalide : 0
> établissement de SIRET {} écarté : sa date de création, {}, est invalide : 
> 0{noformat}
> *2)* But if I don't initialize that list, and that I leave the creation of 
> the missing {{longAccumulator}} to the worker itself, at runtime (filter 
> time), then the :
> {{session.sparkContext().longAccumulator(codeOuFormatMessage);}}
> fails on a {{{}NullPointerException{}}}, as {{sparkContext()}} returns null.
>  
> My first question is : is it normal that, when taking the real actions to 
> build the dataset, the spark session returns a {{null}} context?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to