[ https://issues.apache.org/jira/browse/SPARK-35252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lynn updated SPARK-35252: ------------------------- Attachment: spark-sqlconf-isnull.png > PartitionReaderFactory's Implemention Class of DataSourceV2 sqlConf parameter > is null > ------------------------------------------------------------------------------------- > > Key: SPARK-35252 > URL: https://issues.apache.org/jira/browse/SPARK-35252 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.2 > Reporter: lynn > Priority: Major > Attachments: spark-sqlconf-isnull.png, znbase-sqlconf-isnull.png > > > The codes of "MyPartitionReaderFactory" : > {code:java} > // Implemention Class > package com.lynn.spark.sql.v2 > import org.apache.spark.internal.Logging > import org.apache.spark.sql.catalyst.InternalRow > import > com.lynn.spark.sql.v2.MyPartitionReaderFactory.{MY_VECTORIZED_READER_BATCH_SIZE, > MY_VECTORIZED_READER_ENABLED} > import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, > PartitionReaderFactory} > import org.apache.spark.sql.internal.SQLConf > import org.apache.spark.sql.types.StructType > import org.apache.spark.sql.vectorized.ColumnarBatch > import org.apache.spark.sql.internal.SQLConf.buildConf > case class MyPartitionReaderFactory(sqlConf: SQLConf, > dataSchema: StructType, > readSchema: StructType) > extends PartitionReaderFactory with Logging { > val enableVectorized = sqlConf.getConf(MY_VECTORIZED_READER_ENABLED, false) > val batchSize = sqlConf.getConf(MY_VECTORIZED_READER_BATCH_SIZE, 4096) > override def createReader(partition: InputPartition): > PartitionReader[InternalRow] = { > MyRowReader(batchSize, dataSchema, readSchema) > } > override def createColumnarReader(partition: InputPartition): > PartitionReader[ColumnarBatch] = { > if(!supportColumnarReads(partition)) > throw new UnsupportedOperationException("Cannot create columnar > reader.") > MyColumnReader(batchSize, dataSchema, readSchema) > } > override def supportColumnarReads(partition: InputPartition) = > enableVectorized > } > object MyPartitionReaderFactory { > val MY_VECTORIZED_READER_ENABLED = > buildConf("spark.sql.my.enableVectorizedReader") > .doc("Enables vectorized my source scan.") > .version("1.0.0") > .booleanConf > .createWithDefault(false) > val MY_VECTORIZED_READER_BATCH_SIZE = > buildConf("spark.sql.my.columnarReaderBatchSize") > .doc("The number of rows to include in a my source vectorized reader > batch. The number should " + > "be carefully chosen to minimize overhead and avoid OOMs in reading > data.") > .version("1.0.0") > .intConf > .createWithDefault(4096) > } > {code} > The driver construct a RDD instance, the sqlConf parameter pass to the > MyPartitionReaderFactory is not null. > But when the executor deserialize the RDD, the sqlConf parameter is null. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org