shivaprasad-basavaraj opened a new issue, #6974: URL: https://github.com/apache/iceberg/issues/6974
### Apache Iceberg version 1.0.0 ### Query engine Spark ### Please describe the bug 🐞 I am trying to write an application which reads from a kafka topic and pushes down the data to an iceberg table. The event data from kafka would be in a JSON format. Hence I have to process the stream data frame using the from_json() functions so that the iceberg tables have the correct schema instead of dumping a json string. My test application below works fine if I output to a console. Code ```python import pyspark from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, col from pyspark.sql.types import StringType, StructType, StructField, IntegerType, FloatType import sys NESSIE_URI = "http://10.164.64.154:30324/api/v1" AWS_ACCESS_KEY = "console" ## AWS CREDENTIALS AWS_SECRET_KEY = "console123" ## AWS CREDENTIALS MINIO_ENDPOINT = "http://10.164.64.27:32000" ## POINT TO MINIO KAFKA_TOPIC = sys.argv[1] ICEBERG_TABLE_NAME = "nessie.icebergdb." + KAFKA_TOPIC print("------------------------------------------------------") print("Processing spark job for topic " + KAFKA_TOPIC) print("------------------------------------------------------") conf = ( pyspark.SparkConf() .setAppName(KAFKA_TOPIC) .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions') .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.nessie.uri', NESSIE_URI) .set('spark.sql.catalog.nessie.ref', 'main') .set('spark.sql.catalog.nessie.authentication.type', 'NONE') .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog') .set('spark.sql.catalog.nessie.warehouse', 's3a://iceberg') .set('spark.sql.catalog.nessie.s3.endpoint', MINIO_ENDPOINT) .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') ) spark = SparkSession.builder.config(conf=conf).getOrCreate() spark.sparkContext.setLogLevel('WARN') print("Spark Session created") nessie_branch = 'test_json_df' spark.sql(f"CREATE BRANCH IF NOT EXISTS {nessie_branch} IN nessie FROM main") refs_list = spark.sql("LIST REFERENCES IN nessie") refs_list.show() spark.sql(f"USE REFERENCE {nessie_branch} IN nessie") spark.sql(f"CREATE TABLE IF NOT EXISTS {ICEBERG_TABLE_NAME} \ (key STRING, value STRING) USING iceberg") spark.sql(f"CREATE TABLE IF NOT EXISTS {ICEBERG_TABLE_NAME} \ (key STRING, value STRING) USING iceberg") print("-------------------------------------------------") print("Reading from topic " + KAFKA_TOPIC) print("-------------------------------------------------") df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "frenchie:31092,kimiko:31092,hughie:31092") \ .option("subscribe", KAFKA_TOPIC) \ .option("startingOffsets","earliest") \ .load() print("Spark read stream created") schema = StructType([ StructField("battery_num",IntegerType(),True), StructField("ambient_temperature",FloatType(),True), StructField("time",IntegerType(),True), StructField("ID", IntegerType(), True), StructField("type", StringType(), True), ]) query = df \ .selectExpr("CAST(value AS STRING)") \ .select(from_json(col("value"), schema).alias("data")) \ .select("data.*") \ .writeStream \ .format("console") \ .option("truncate","false") \ .start() \ .awaitTermination() ``` Output ``` ------------------------------------------------- Reading from topic events ------------------------------------------------- Spark read stream created StructType([StructField('key', BinaryType(), True), StructField('value', BinaryType(), True), StructField('topic', StringType(), True), StructField('partition', IntegerType(), True), StructField('offset', LongType(), True), StructField('timestamp', TimestampType(), True), StructField('timestampType', IntegerType(), True)]) 23/03/01 13:07:57 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c930c979-fbed-4118-be26-36e0778bb637. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort. 23/03/01 13:07:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. ------------------------------------------- Batch: 0 ------------------------------------------- +-----------+-------------------+----+---+---------+ |battery_num|ambient_temperature|time|ID |type | +-----------+-------------------+----+---+---------+ |1 |0.9596235 |0 |0 |discharge| |1 |7.16299 |3 |1 |impedance| |1 |14.47257 |4 |2 |impedance| |1 |33.66843 |5 |3 |impedance| |1 |-5.42997 |6 |4 |discharge| |1 |-5.296253 |8 |5 |charge | |1 |-3.0481791 |11 |6 |impedance| |1 |-6.6242948 |12 |7 |discharge| |1 |36.216015 |14 |8 |discharge| |1 |38.60477 |16 |9 |charge | +-----------+-------------------+----+---+---------+ ``` However if I try to write this table to iceberg by replacing the writeStream code with the code below ```python query = df \ .selectExpr("CAST(value AS STRING)") \ .select(from_json(col("value"), schema).alias("data")) \ .select("data.*") \ .writeStream \ .format("iceberg") \ .trigger(processingTime='1 seconds')\ .option("path", ICEBERG_TABLE_NAME)\ .option("checkpointLocation", "/tmp/checkpoints")\ .start() \ .awaitTermination() ``` I get the following error: ``` ------------------------------------------------- Reading from topic events ------------------------------------------------- Spark read stream created StructType([StructField('key', BinaryType(), True), StructField('value', BinaryType(), True), StructField('topic', StringType(), True), StructField('partition', IntegerType(), True), StructField('offset', LongType(), True), StructField('timestamp', TimestampType(), True), StructField('timestampType', IntegerType(), True)]) 23/03/01 13:08:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. 23/03/01 13:08:59 ERROR MicroBatchExecution: Query [id = f14cf37d-1a9b-4340-bc5c-9a5ddb0753a3, runId = 77a1d9c7-f9ea-4eba-a722-70b3fa2cb023] terminated with error java.lang.IllegalArgumentException: Field battery_num not found in source schema at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:103) at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:27) at org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:571) at org.apache.iceberg.relocated.com.google.common.collect.Iterators$6.transform(Iterators.java:826) at org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:52) at org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:367) at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:147) at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:133) .... ``` What am I missing here? Is this a known bug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
