LucasRoesler commented on issue #7657:
URL: https://github.com/apache/iceberg/issues/7657#issuecomment-1554792856
Sorry about not including the code snippet earlier. We are using PySpark.
We are using `foreachBatch` and then just appending the batch dataframe to
the table like this
```py
df.writeTo(table_identifier)
.using("iceberg")
.option("fanout-enabled", True)
.append()
```
The full loader code is here, lightly modified to remove some comments and
in-lining a few config values, etc:
<details>
```py
LOADING_ZONE_TABLE = "loading_zone_b"
def process_batch(spark: SparkSession, opts: Options):
def process(df: DataFrame, id: int):
if df.isEmpty():
return
df = df.withColumns(
{
"loaded_at": F.lit(F.current_timestamp()),
"loader_metadata": F.struct(
F.lit(__version__).alias("version"),
F.lit(__tag__).alias("tag"),
F.lit(loader_additional).alias("additional"),
),
}
)
df = df.drop("processed").withColumnRenamed("processing_error",
"loader_error")
cfg = opts.iceberg
table_identifier = cfg.table_identifier(LOADING_ZONE_TABLE)
(
df.writeTo(table_identifier)
.using("iceberg")
.option("fanout-enabled", True)
.append()
)
return process
def loader(spark: SparkSession, opts: Options):
df = (
spark.readStream.format("kafka")
.options(**opts.kafka.to_dict())
.option("subscribe", opts.kafka.topic)
.load()
)
df = df.withColumn(
"topic",
F.col("topic").cast("string")
).withColumn(
"value",
F.col("value").cast("string")
)
stream = (
df.select(
F.col("topic").alias("kafka_topic"),
F.col("timestamp").alias("kafka_received_at"),
F.col("timestamp").alias("kafka_timestamp"),
parser(
F.lit(opts.hashing.id_hash_password.get_secret_value()),
F.col("value"),
).alias("event_data"),
)
.select(
F.col("kafka_topic"),
F.col("kafka_received_at"),
F.col("event_data.*"),
)
.writeStream.queryName("loader")
.trigger(processingTime=opts.iceberg.processing_time)
.option("checkpointLocation", opts.kafka.checkpointLocation)
.foreachBatch(process_batch(spark, opts))
.start()
)
stream.awaitTermination()
```
</details>
You mention that the plan should give details about where the ExistingRDD is
coming from but since I am still somewhat new to Spark, I just don't realize it
when I am looking at it
```txt
== Physical Plan ==
AppendData (5)
+- Sort (4)
+- Exchange (3)
+- Project (2)
+- * Scan ExistingRDD (1)
(1) Scan ExistingRDD [codegen id : 1]
Output [17]: [kafka_topic#440073, kafka_received_at#440074,
processed#440075, processing_error#440076, source#440077,
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081,
event_type#440082, event_name#440083, event_type_version#440084,
person_id#440085, ids#440086, collector#440087, event#440088, raw#440089]
Arguments: [kafka_topic#440073, kafka_received_at#440074, processed#440075,
processing_error#440076, source#440077, source_event_id#440078,
recorded_at#440079, recorded_by#440080, token#440081, event_type#440082,
event_name#440083, event_type_version#440084, person_id#440085, ids#440086,
collector#440087, event#440088, raw#440089], SQLExecutionRDD[35818] at start at
<unknown>:0, ExistingRDD, UnknownPartitioning(0)
(2) Project
Output [18]: [kafka_topic#440073, kafka_received_at#440074, source#440077,
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081,
event_type#440082, event_name#440083, event_type_version#440084,
person_id#440085, transform(ids#440086, lambdafunction(if (isnull(lambda
x#440204)) null else named_struct(id, lambda x#440204.id, id_type, lambda
x#440204.id_type), lambda x#440204, false)) AS ids#440211, if
(isnull(collector#440087)) null else named_struct(id, collector#440087.id,
received_at, collector#440087.received_at, hash, collector#440087.hash) AS
collector#440218, event#440088, raw#440089, 2023-05-19 15:53:01.001032 AS
loaded_at#440110, processing_error#440076 AS loader_error#440149,
[3.18.4,v3.18.4,{}] AS loader_metadata#440225]
Input [17]: [kafka_topic#440073, kafka_received_at#440074, processed#440075,
processing_error#440076, source#440077, source_event_id#440078,
recorded_at#440079, recorded_by#440080, token#440081, event_type#440082,
event_name#440083, event_type_version#440084, person_id#440085, ids#440086,
collector#440087, event#440088, raw#440089]
(3) Exchange
Input [18]: [kafka_topic#440073, kafka_received_at#440074, source#440077,
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081,
event_type#440082, event_name#440083, event_type_version#440084,
person_id#440085, ids#440211, collector#440218, event#440088, raw#440089,
loaded_at#440110, loader_error#440149, loader_metadata#440225]
Arguments: hashpartitioning(iceberghourtransform(loaded_at#440110), 200),
REPARTITION_BY_NUM, [plan_id=199587]
(4) Sort
Input [18]: [kafka_topic#440073, kafka_received_at#440074, source#440077,
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081,
event_type#440082, event_name#440083, event_type_version#440084,
person_id#440085, ids#440211, collector#440218, event#440088, raw#440089,
loaded_at#440110, loader_error#440149, loader_metadata#440225]
Arguments: [iceberghourtransform(loaded_at#440110) ASC NULLS FIRST], false, 0
(5) AppendData
Input [18]: [kafka_topic#440073, kafka_received_at#440074, source#440077,
source_event_id#440078, recorded_at#440079, recorded_by#440080, token#440081,
event_type#440082, event_name#440083, event_type_version#440084,
person_id#440085, ids#440211, collector#440218, event#440088, raw#440089,
loaded_at#440110, loader_error#440149, loader_metadata#440225]
Arguments:
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5479/0x0000000841df1c40@a4ed326,
IcebergWrite(table=glue.dwh.loading_zone_b, format=PARQUET)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]