shivaprasad-basavaraj opened a new issue, #6974:
URL: https://github.com/apache/iceberg/issues/6974

   ### Apache Iceberg version
   
   1.0.0
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   I am trying to write an application which reads from a kafka topic and 
pushes down the data to an iceberg table. The event data from kafka would be in 
a JSON format. Hence I have to process the stream data frame using the 
from_json() functions so that the iceberg tables have the correct schema 
instead of dumping a json string. My test application below works fine if I 
output to a console. 
   
   Code
   ```python
   import pyspark
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import from_json, col
   from pyspark.sql.types import StringType, StructType, StructField, 
IntegerType, FloatType
   import sys
   
   NESSIE_URI = "http://10.164.64.154:30324/api/v1";
   AWS_ACCESS_KEY = "console" ## AWS CREDENTIALS
   AWS_SECRET_KEY = "console123" ## AWS CREDENTIALS
   MINIO_ENDPOINT = "http://10.164.64.27:32000"; ## POINT TO MINIO
   
   KAFKA_TOPIC = sys.argv[1]
   ICEBERG_TABLE_NAME = "nessie.icebergdb." + KAFKA_TOPIC
   
   print("------------------------------------------------------")
   print("Processing spark job for topic " + KAFKA_TOPIC)
   print("------------------------------------------------------")
   
   
   conf = (
       pyspark.SparkConf()
           .setAppName(KAFKA_TOPIC)
           .set('spark.sql.extensions', 
'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
           .set('spark.sql.catalog.nessie', 
'org.apache.iceberg.spark.SparkCatalog')
           .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
           .set('spark.sql.catalog.nessie.ref', 'main')
           .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
           .set('spark.sql.catalog.nessie.catalog-impl', 
'org.apache.iceberg.nessie.NessieCatalog')
           .set('spark.sql.catalog.nessie.warehouse', 's3a://iceberg')
           .set('spark.sql.catalog.nessie.s3.endpoint', MINIO_ENDPOINT)
           .set('spark.sql.catalog.nessie.io-impl', 
'org.apache.iceberg.aws.s3.S3FileIO')
   )
   
   spark = SparkSession.builder.config(conf=conf).getOrCreate()
   spark.sparkContext.setLogLevel('WARN')
   
   print("Spark Session created")
   nessie_branch = 'test_json_df'
   spark.sql(f"CREATE BRANCH IF NOT EXISTS {nessie_branch} IN nessie FROM main")
   refs_list = spark.sql("LIST REFERENCES IN nessie")
   refs_list.show()
   spark.sql(f"USE REFERENCE {nessie_branch} IN nessie")
   
   
   spark.sql(f"CREATE TABLE IF NOT EXISTS {ICEBERG_TABLE_NAME} \
             (key STRING, value STRING) USING iceberg")
   
   spark.sql(f"CREATE TABLE IF NOT EXISTS {ICEBERG_TABLE_NAME} \
             (key STRING, value STRING) USING iceberg")
   
   print("-------------------------------------------------")
   print("Reading from topic " + KAFKA_TOPIC)
   print("-------------------------------------------------")
   
   df = spark \
       .readStream \
       .format("kafka") \
       .option("kafka.bootstrap.servers", 
"frenchie:31092,kimiko:31092,hughie:31092") \
       .option("subscribe", KAFKA_TOPIC) \
       .option("startingOffsets","earliest") \
       .load()
   
   print("Spark read stream created")
   
   schema = StructType([ 
       StructField("battery_num",IntegerType(),True), 
       StructField("ambient_temperature",FloatType(),True), 
       StructField("time",IntegerType(),True), 
       StructField("ID", IntegerType(), True),
       StructField("type", StringType(), True),
     ])
   
   query = df \
       .selectExpr("CAST(value AS STRING)") \
       .select(from_json(col("value"), schema).alias("data")) \
       .select("data.*") \
       .writeStream \
       .format("console") \
       .option("truncate","false") \
       .start() \
       .awaitTermination()
   ```
   
   Output
   ```
   -------------------------------------------------
   Reading from topic events
   -------------------------------------------------
   Spark read stream created
   StructType([StructField('key', BinaryType(), True), StructField('value', 
BinaryType(), True), StructField('topic', StringType(), True), 
StructField('partition', IntegerType(), True), StructField('offset', 
LongType(), True), StructField('timestamp', TimestampType(), True), 
StructField('timestampType', IntegerType(), True)])
   23/03/01 13:07:57 WARN ResolveWriteToStream: Temporary checkpoint location 
created which is deleted normally when the query didn't fail: 
/tmp/temporary-c930c979-fbed-4118-be26-36e0778bb637. If it's required to delete 
it under any circumstances, please set 
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to 
know deleting temp checkpoint folder is best effort.
   23/03/01 13:07:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is 
not supported in streaming DataFrames/Datasets and will be disabled.
   -------------------------------------------
   Batch: 0
   -------------------------------------------
   +-----------+-------------------+----+---+---------+
   |battery_num|ambient_temperature|time|ID |type     |
   +-----------+-------------------+----+---+---------+
   |1          |0.9596235          |0   |0  |discharge|
   |1          |7.16299            |3   |1  |impedance|
   |1          |14.47257           |4   |2  |impedance|
   |1          |33.66843           |5   |3  |impedance|
   |1          |-5.42997           |6   |4  |discharge|
   |1          |-5.296253          |8   |5  |charge   |
   |1          |-3.0481791         |11  |6  |impedance|
   |1          |-6.6242948         |12  |7  |discharge|
   |1          |36.216015          |14  |8  |discharge|
   |1          |38.60477           |16  |9  |charge   |
   +-----------+-------------------+----+---+---------+
   ```
   
   However if I try to write this table to iceberg by replacing the writeStream 
code with the code below
   ```python
   query = df \
       .selectExpr("CAST(value AS STRING)") \
       .select(from_json(col("value"), schema).alias("data")) \
       .select("data.*") \
       .writeStream \
       .format("iceberg") \
       .trigger(processingTime='1 seconds')\
       .option("path", ICEBERG_TABLE_NAME)\
       .option("checkpointLocation", "/tmp/checkpoints")\
       .start() \
       .awaitTermination()
   ```
   
   I get the following error:
   ```
   -------------------------------------------------
   Reading from topic events
   -------------------------------------------------
   Spark read stream created
   StructType([StructField('key', BinaryType(), True), StructField('value', 
BinaryType(), True), StructField('topic', StringType(), True), 
StructField('partition', IntegerType(), True), StructField('offset', 
LongType(), True), StructField('timestamp', TimestampType(), True), 
StructField('timestampType', IntegerType(), True)])
   23/03/01 13:08:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is 
not supported in streaming DataFrames/Datasets and will be disabled.
   23/03/01 13:08:59 ERROR MicroBatchExecution: Query [id = 
f14cf37d-1a9b-4340-bc5c-9a5ddb0753a3, runId = 
77a1d9c7-f9ea-4eba-a722-70b3fa2cb023] terminated with error
   java.lang.IllegalArgumentException: Field battery_num not found in source 
schema
           at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:103)
           at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:27)
           at 
org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:571)
           at 
org.apache.iceberg.relocated.com.google.common.collect.Iterators$6.transform(Iterators.java:826)
           at 
org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:52)
           at 
org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:367)
           at 
org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:147)
           at 
org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:133)
   ....
   ```
   
   What am I missing here? Is this a known bug?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to