LucasRoesler commented on issue #7657:
URL: https://github.com/apache/iceberg/issues/7657#issuecomment-1554792856

   Sorry about not including the code snippet earlier. We are using PySpark.
   
   We are using `foreachBatch` and then just appending the batch dataframe to 
the table like this
   
   ```py
    df.writeTo(table_identifier)
     .using("iceberg")
     .option("fanout-enabled", True)
     .append()
   ```
   
   The full loader code is here, lightly modified to remove some comments and 
in-lining a few config values, etc:
   
   <details>
   
   ```py
   LOADING_ZONE_TABLE = "loading_zone_b"
   
   
   def process_batch(spark: SparkSession, opts: Options):
       def process(df: DataFrame, id: int):
           if df.isEmpty():
               return
           
           df = df.withColumns(
               {
                   "loaded_at": F.lit(F.current_timestamp()),
                   "loader_metadata": F.struct(
                       F.lit(__version__).alias("version"),
                       F.lit(__tag__).alias("tag"),
                       F.lit(loader_additional).alias("additional"),
                   ),
               }
           )
   
   
           df = df.drop("processed").withColumnRenamed("processing_error", 
"loader_error")
   
           cfg = opts.iceberg
           table_identifier = cfg.table_identifier(LOADING_ZONE_TABLE)
   
           (
               df.writeTo(table_identifier)
               .using("iceberg")
               .option("fanout-enabled", True)
               .append()
           )
   
       return process
   
   def loader(spark: SparkSession, opts: Options):
   
       df = (
           spark.readStream.format("kafka")
           .options(**opts.kafka.to_dict())
           .option("subscribe", opts.kafka.topic)
           .load()
       )
   
       df = df.withColumn(
           "topic", 
           F.col("topic").cast("string")
       ).withColumn(
           "value", 
           F.col("value").cast("string")
       )
   
   
       stream = (
           df.select(
               F.col("topic").alias("kafka_topic"),
               F.col("timestamp").alias("kafka_received_at"),
               F.col("timestamp").alias("kafka_timestamp"),
               parser(
                   F.lit(opts.hashing.id_hash_password.get_secret_value()),
                   F.col("value"),
               ).alias("event_data"),
           )
           .select(
               F.col("kafka_topic"),
               F.col("kafka_received_at"),
               F.col("event_data.*"),
           )
           .writeStream.queryName("loader")
           .trigger(processingTime=opts.iceberg.processing_time)
           .option("checkpointLocation", opts.kafka.checkpointLocation)
           .foreachBatch(process_batch(spark, opts))
           .start()
       )
   
       stream.awaitTermination()
   ```
   
   </details>
   
   You mention that the plan should give details about where the ExistingRDD is 
coming from but since I am still somewhat new to Spark, I just don't realize it 
when I am looking at it
   
   ```txt
   == Physical Plan ==
   AppendData (5)
   +- Sort (4)
      +- Exchange (3)
         +- Project (2)
            +- * Scan ExistingRDD (1)
   
   
   (1) Scan ExistingRDD [codegen id : 1]
   Output [17]: [kafka_topic#440073, kafka_received_at#440074, 
processed#440075, processing_error#440076, source#440077, 
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081, 
event_type#440082, event_name#440083, event_type_version#440084, 
person_id#440085, ids#440086, collector#440087, event#440088, raw#440089]
   Arguments: [kafka_topic#440073, kafka_received_at#440074, processed#440075, 
processing_error#440076, source#440077, source_event_id#440078, 
recorded_at#440079, recorded_by#440080, token#440081, event_type#440082, 
event_name#440083, event_type_version#440084, person_id#440085, ids#440086, 
collector#440087, event#440088, raw#440089], SQLExecutionRDD[35818] at start at 
<unknown>:0, ExistingRDD, UnknownPartitioning(0)
   
   (2) Project
   Output [18]: [kafka_topic#440073, kafka_received_at#440074, source#440077, 
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081, 
event_type#440082, event_name#440083, event_type_version#440084, 
person_id#440085, transform(ids#440086, lambdafunction(if (isnull(lambda 
x#440204)) null else named_struct(id, lambda x#440204.id, id_type, lambda 
x#440204.id_type), lambda x#440204, false)) AS ids#440211, if 
(isnull(collector#440087)) null else named_struct(id, collector#440087.id, 
received_at, collector#440087.received_at, hash, collector#440087.hash) AS 
collector#440218, event#440088, raw#440089, 2023-05-19 15:53:01.001032 AS 
loaded_at#440110, processing_error#440076 AS loader_error#440149, 
[3.18.4,v3.18.4,{}] AS loader_metadata#440225]
   Input [17]: [kafka_topic#440073, kafka_received_at#440074, processed#440075, 
processing_error#440076, source#440077, source_event_id#440078, 
recorded_at#440079, recorded_by#440080, token#440081, event_type#440082, 
event_name#440083, event_type_version#440084, person_id#440085, ids#440086, 
collector#440087, event#440088, raw#440089]
   
   (3) Exchange
   Input [18]: [kafka_topic#440073, kafka_received_at#440074, source#440077, 
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081, 
event_type#440082, event_name#440083, event_type_version#440084, 
person_id#440085, ids#440211, collector#440218, event#440088, raw#440089, 
loaded_at#440110, loader_error#440149, loader_metadata#440225]
   Arguments: hashpartitioning(iceberghourtransform(loaded_at#440110), 200), 
REPARTITION_BY_NUM, [plan_id=199587]
   
   (4) Sort
   Input [18]: [kafka_topic#440073, kafka_received_at#440074, source#440077, 
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081, 
event_type#440082, event_name#440083, event_type_version#440084, 
person_id#440085, ids#440211, collector#440218, event#440088, raw#440089, 
loaded_at#440110, loader_error#440149, loader_metadata#440225]
   Arguments: [iceberghourtransform(loaded_at#440110) ASC NULLS FIRST], false, 0
   
   (5) AppendData
   Input [18]: [kafka_topic#440073, kafka_received_at#440074, source#440077, 
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081, 
event_type#440082, event_name#440083, event_type_version#440084, 
person_id#440085, ids#440211, collector#440218, event#440088, raw#440089, 
loaded_at#440110, loader_error#440149, loader_metadata#440225]
   Arguments: 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5479/0x0000000841df1c40@a4ed326,
 IcebergWrite(table=glue.dwh.loading_zone_b, format=PARQUET)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to