[
https://issues.apache.org/jira/browse/SPARK-54302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18039869#comment-18039869
]
Maxim Martynov edited comment on SPARK-54302 at 11/21/25 2:11 PM:
------------------------------------------------------------------
I'm using Spark with Clickhouse JDBC driver to read Hives table with .orc files
and write data to Clickhouse table.
docker-compose.yml
{code:yaml}
services:
clickhouse:
image: clickhouse/clickhouse-server:latest-alpine
environment:
TZ: UTC
CLICKHOUSE_DB: default
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD: 123UsedForTestOnly@!
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
ports:
- 8123:8123
- 9001:9000
sysctls:
- net.ipv6.conf.all.disable_ipv6=1
{code}
{code:bash}
pip install pyspark[sql]==3.5.7
{code}
{code:python}
from pyspark.sql import SparkSession
maven_packages = [
"com.clickhouse:clickhouse-jdbc-all:0.9.3",
]
spark = (
SparkSession.builder
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
# in actual code it's `spark.table("my_table_on_orc_files")`
df = spark.createDataFrame([{"id": 1, "name": "John"}, {"id": 2, "name":
"Jane"}], "id: int, name: string")
(
df.write.format("jdbc")
.option("url", "jdbc:clickhouse://localhost:8123")
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
.option("user", "default")
.option("password", "123UsedForTestOnly@!")
.option("dbtable", "default.newtable")
.option("createTableOptions", "ENGINE = MergeTree() ORDER BY id")
.mode("overwrite")
.save()
)
{code}
Spark has a well-known quirk that all columns read from .orc files have
{{nullable = true}}, even if column in this particular file doesn't contain
NULLs.
{code}
df.printSchema()
# root
# |-- id: integer (nullable = true)
# |-- name: string (nullable = true)
{code}
By default, Spark uses GenericDialect for Clickhouse, and thus generating DDL
like this:
https://github.com/apache/spark/blob/v3.5.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L141
{code:sql}
CREATE TABLE default.newtable (
id INTEGER,
name TEXT
)
ENGINE = MergeTree() ORDER BY id
{code}
This is because
[JdbcUtils|https://github.com/apache/spark/blob/v3.5.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L823]
adds `NOT NULL` suffix to non-nullable columns, and no suffix for nullable
ones.
On Clickhouse side, this is treated as:
{code:sql}
CREATE TABLE default.newtable
(
`id` Int32,
`name` String
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192
{code}
But {{Int32}} and {{String}} here are non-nullable types. It should be instead:
{code:sql}
CREATE TABLE default.newtable
(
`id` Nullable(Int32),
`name` Nullable(String)
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192
{code}
This is fixed by a [custom JDBC dialect for
Spark|https://github.com/MobileTeleSystems/spark-dialect-extension]. It's not
about Integer or String columns, as writing NULL to these will insert 0 and
empty string. But other types, like Array(), doesn't support inserting NULLs
into non-NULLable columns.
With {{Nullable(Int32)}} column in CREATE TABLE we have another issue -
Clickhouse doesn't allow that:
{code:python}
from pyspark.sql import SparkSession
maven_packages = [
"io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.3",
"com.clickhouse:clickhouse-jdbc-all:0.9.3",
]
spark = (
SparkSession.builder
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
# Register custom Clickhouse dialect
ClickhouseDialectRegistry =
spark._jvm.io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry
ClickhouseDialectRegistry.register()
df = spark.createDataFrame([{"id": 1, "name": "John"}, {"id": 2, "name":
"Jane"}], "id: int, name: string")
df = df.where(df.id.isNotNull())
(
df.write.format("jdbc")
.option("url", "jdbc:clickhouse://localhost:8123")
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
.option("user", "default")
.option("password", "123UsedForTestOnly@!")
.option("dbtable", "default.newtable")
.option("createTableOptions", "ENGINE = MergeTree() ORDER BY id")
.mode("overwrite")
.save()
)
{code}
{code}
py4j.protocol.Py4JJavaError: An error occurred while calling o68.save.
: java.sql.SQLException: Code: 44. DB::Exception: Sorting key contains nullable
columns, but merge tree setting `allow_nullable_key` is disabled.
(ILLEGAL_COLUMN) (version 25.8.3.66 (official build))
at
com.clickhouse.jdbc.internal.ExceptionUtils.toSqlState(ExceptionUtils.java:67)
at
com.clickhouse.jdbc.internal.ExceptionUtils.toSqlState(ExceptionUtils.java:42)
at
com.clickhouse.jdbc.StatementImpl.executeUpdateImpl(StatementImpl.java:225)
at
com.clickhouse.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:620)
at
com.clickhouse.jdbc.StatementImpl.executeUpdate(StatementImpl.java:196)
at
io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.spark35.ClickhouseDialectExtension$.createTable(ClickhouseDialectExtension.scala:216)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:923)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.clickhouse.client.api.ServerException: Code: 44. DB::Exception:
Sorting key contains nullable columns, but merge tree setting
`allow_nullable_key` is disabled. (ILLEGAL_COLUMN) (version 25.8.3.66 (official
build))
at
com.clickhouse.client.api.internal.HttpAPIClientHelper.readError(HttpAPIClientHelper.java:398)
at
com.clickhouse.client.api.internal.HttpAPIClientHelper.executeRequest(HttpAPIClientHelper.java:461)
at com.clickhouse.client.api.Client.lambda$query$5(Client.java:1599)
at com.clickhouse.client.api.Client.runAsyncOperation(Client.java:2014)
at com.clickhouse.client.api.Client.query(Client.java:1644)
at com.clickhouse.client.api.Client.query(Client.java:1542)
at
com.clickhouse.jdbc.StatementImpl.executeUpdateImpl(StatementImpl.java:219)
... 46 more
{code}
According to https://clickhouse.com/docs/sql-reference/data-types/nullable:
{quote}
A Nullable type field can't be included in table indexes.
{quote}
So we should use DataFrame with schema like this:
{code}
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
{code}
to generate correct DDL with non-nullable {{id}} column:
{code:sql}
CREATE TABLE default.newtable
(
`id` Int32,
`name` Nullable(String)
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192
{code}
In most databases plan optimizer can understand that in this clause:
{code}
SELECT
id, --nullable
name --nullable
FROM table
WHERE id IS NOT NULL
{code}
will return columns of types {{INTEGER NOT NULL, TEXT NULL}}. But Spark's
{{.isNotNull()}} filters don't reset nullable flag on column.
was (Author: JIRAUSER283764):
I'm using Spark with Clickhouse JDBC driver to read Hives table with .orc files
and write data to Clickhouse table.
docker-compose.yml
{code:yml}
services:
clickhouse:
image: clickhouse/clickhouse-server:latest-alpine
environment:
TZ: UTC
CLICKHOUSE_DB: default
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD: 123UsedForTestOnly@!
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
ports:
- 8123:8123
- 9001:9000
sysctls:
- net.ipv6.conf.all.disable_ipv6=1
{code}
{code:bash}
pip install pyspark[sql]==3.5.7
{code}
{code:python}
from pyspark.sql import SparkSession
maven_packages = [
"com.clickhouse:clickhouse-jdbc-all:0.9.3",
]
spark = (
SparkSession.builder
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
# in actual code it's `spark.table("my_table_on_orc_files")`
df = spark.createDataFrame([{"id": 1, "name": "John"}, {"id": 2, "name":
"Jane"}], "id: int, name: string")
(
df.write.format("jdbc")
.option("url", "jdbc:clickhouse://localhost:8123")
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
.option("user", "default")
.option("password", "123UsedForTestOnly@!")
.option("dbtable", "default.newtable")
.option("createTableOptions", "ENGINE = MergeTree() ORDER BY id")
.mode("overwrite")
.save()
)
{code}
Spark has a well-known quirk that all columns read from .orc files have
{{nullable = true}}, even if column in this particular file doesn't contain
NULLs.
{code}
df.printSchema()
# root
# |-- id: integer (nullable = true)
# |-- name: string (nullable = true)
{code}
By default, Spark uses GenericDialect for Clickhouse, and thus generating DDL
like this:
https://github.com/apache/spark/blob/v3.5.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L141
{code:sql}
CREATE TABLE default.newtable (
id INTEGER,
name TEXT
)
ENGINE = MergeTree() ORDER BY id
{code}
This is because
[JdbcUtils|https://github.com/apache/spark/blob/v3.5.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L823]
adds `NOT NULL` suffix to non-nullable columns, and no suffix for nullable
ones.
On Clickhouse side, this is treated as:
{code:sql}
CREATE TABLE default.newtable
(
`id` Int32,
`name` String
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192
{code}
But {{Int32}} and {{String}} here are non-nullable types. It should be instead:
{code:sql}
CREATE TABLE default.newtable
(
`id` Nullable(Int32),
`name` Nullable(String)
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192
{code}
This is fixed by a [custom JDBC dialect for
Spark|https://github.com/MobileTeleSystems/spark-dialect-extension]. It's not
about Integer or String columns, as writing NULL to these will insert 0 and
empty string. But other types, like Array(), doesn't support inserting NULLs
into non-NULLable columns.
With {{Nullable(Int32)}} column in CREATE TABLE we have another issue -
Clickhouse doesn't allow that:
{code:python}
from pyspark.sql import SparkSession
maven_packages = [
"io.github.mtsongithub.doetl:spark-dialect-extension_2.12:0.0.3",
"com.clickhouse:clickhouse-jdbc-all:0.9.3",
]
spark = (
SparkSession.builder
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
# Register custom Clickhouse dialect
ClickhouseDialectRegistry =
spark._jvm.io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.ClickhouseDialectRegistry
ClickhouseDialectRegistry.register()
df = spark.createDataFrame([{"id": 1, "name": "John"}, {"id": 2, "name":
"Jane"}], "id: int, name: string")
df = df.where(df.id.isNotNull())
(
df.write.format("jdbc")
.option("url", "jdbc:clickhouse://localhost:8123")
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
.option("user", "default")
.option("password", "123UsedForTestOnly@!")
.option("dbtable", "default.newtable")
.option("createTableOptions", "ENGINE = MergeTree() ORDER BY id")
.mode("overwrite")
.save()
)
{code}
{code}
py4j.protocol.Py4JJavaError: An error occurred while calling o68.save.
: java.sql.SQLException: Code: 44. DB::Exception: Sorting key contains nullable
columns, but merge tree setting `allow_nullable_key` is disabled.
(ILLEGAL_COLUMN) (version 25.8.3.66 (official build))
at
com.clickhouse.jdbc.internal.ExceptionUtils.toSqlState(ExceptionUtils.java:67)
at
com.clickhouse.jdbc.internal.ExceptionUtils.toSqlState(ExceptionUtils.java:42)
at
com.clickhouse.jdbc.StatementImpl.executeUpdateImpl(StatementImpl.java:225)
at
com.clickhouse.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:620)
at
com.clickhouse.jdbc.StatementImpl.executeUpdate(StatementImpl.java:196)
at
io.github.mtsongithub.doetl.sparkdialectextensions.clickhouse.spark35.ClickhouseDialectExtension$.createTable(ClickhouseDialectExtension.scala:216)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:923)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.clickhouse.client.api.ServerException: Code: 44. DB::Exception:
Sorting key contains nullable columns, but merge tree setting
`allow_nullable_key` is disabled. (ILLEGAL_COLUMN) (version 25.8.3.66 (official
build))
at
com.clickhouse.client.api.internal.HttpAPIClientHelper.readError(HttpAPIClientHelper.java:398)
at
com.clickhouse.client.api.internal.HttpAPIClientHelper.executeRequest(HttpAPIClientHelper.java:461)
at com.clickhouse.client.api.Client.lambda$query$5(Client.java:1599)
at com.clickhouse.client.api.Client.runAsyncOperation(Client.java:2014)
at com.clickhouse.client.api.Client.query(Client.java:1644)
at com.clickhouse.client.api.Client.query(Client.java:1542)
at
com.clickhouse.jdbc.StatementImpl.executeUpdateImpl(StatementImpl.java:219)
... 46 more
{code}
According to https://clickhouse.com/docs/sql-reference/data-types/nullable:
{quote}
A Nullable type field can't be included in table indexes.
{quote}
So we should use DataFrame with schema like this:
{code}
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
{code}
to generate correct DDL with non-nullable {{id}} column:
{code:sql}
CREATE TABLE default.newtable
(
`id` Int32,
`name` Nullable(String)
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192
{code}
In most databases plan optimizer can understand that in this clause:
{code}
SELECT
id, --nullable
name --nullable
FROM table
WHERE id IS NOT NULL
{code}
will return columns of types {{INTEGER NOT NULL, TEXT NULL}}. But Spark's
{{.isNotNull()}} filters don't reset nullable flag on column.
> Filtering by isNotNull should return DataFrame with nullable=False
> ------------------------------------------------------------------
>
> Key: SPARK-54302
> URL: https://issues.apache.org/jira/browse/SPARK-54302
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 3.5.7
> Reporter: Maxim Martynov
> Priority: Major
>
> I have DataFrame with schema like this:
> {code:python}
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([{"a": 1},{"a": None}], schema="a:int")
> df.printSchema()
> """
> root
> |-- a: integer (nullable = true)
> """
> df.where(df.a.isNotNull()).printSchema()
> """
> root
> |-- a: integer (nullable = true)
> """
> {code}
> Currently filters applied to dataframe doesn't change it's schema. To make
> colum non-nullable I have to use coalesce:
> {code:python}
> import pyspark.sql.functions as F
> df.where(df.a.isNotNull()).select(F.coalesce(df.a, F.lit(0))).printSchema()
> """
> root
> |-- coalesce(a, 0): integer (nullable = false)
> """
> {code}
> But I have to choose {{F.lit(...)}} value based on column type, even if it
> will never be used.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]