soumilshah1995 commented on issue #10110:
URL: https://github.com/apache/hudi/issues/10110#issuecomment-2247963537
# Code
```
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,
TimestampType, FloatType
from pyspark.sql.functions import hour, col
from datetime import datetime, timedelta
import os
import sys
import random
# Configuration
HUDI_VERSION = '1.0.0-beta2'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages
org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION}
pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Initialize Spark session
spark = SparkSession.builder \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
# Generate mock event data
def generate_event_data(num_events):
event_types = ["click", "view", "purchase", "signup"]
start_time = datetime(2023, 1, 1)
data = []
for i in range(num_events):
event = {
"event_id": i + 1,
"user_id": random.randint(1, 100),
"event_type": random.choice(event_types),
"timestamp": (start_time + timedelta(hours=random.randint(0,
5000))).strftime("%Y-%m-%d %H:%M:%S")
}
data.append(event)
return data
# Create DataFrame
num_events = 10000
events_data = generate_event_data(num_events)
df = spark.createDataFrame(events_data)
df.show()
# Write DataFrame to Hudi table
table_name = "web_events"
path = f'file:///Users/soumilshah/Desktop/{table_name}/'
df.write.format("hudi") \
.option("hoodie.table.name", table_name) \
.option("hoodie.datasource.write.recordkey.field", "event_id") \
.option("hoodie.datasource.write.partitionpath.field", "") \
.option("hoodie.datasource.write.precombine.field", "timestamp") \
.option("hoodie.table.metadata.enable", "true") \
.option("hoodie.metadata.index.column.stats.enable", "true") \
.option("path", path) \
.mode("overwrite") \
.saveAsTable(table_name)
# Create functional index on timestamp column
query_create_ts_datestr = """
CREATE INDEX IF NOT EXISTS ts_datestr ON web_events
USING column_stats(timestamp)
OPTIONS(func='from_unixtime', format='yyyy-MM-dd')
"""
spark.sql(query_create_ts_datestr).show()
# Query data for a specific date
spark.sql("""
SELECT event_type, user_id, event_id
FROM web_events
WHERE date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""").show()
# Explain query plan for date-based query
spark.sql("""
EXPLAIN
SELECT event_type, user_id, event_id
FROM web_events
WHERE date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""").show(truncate=False)
# Create functional index on hour of timestamp
query_create_ts_hour = """
CREATE INDEX ts_hour ON web_events
USING column_stats(timestamp)
OPTIONS(func='hour')
"""
spark.sql(query_create_ts_hour)
# Query data aggregated by hour
spark.sql("""
SELECT hour(timestamp) AS hour_of_day, COUNT(*) AS event_count
FROM web_events
GROUP BY hour(timestamp)
""").show()
```
Questions
How do I verify that the query is using the functional index?
```
spark.sql("""
EXPLAIN
SELECT event_type, user_id, event_id
FROM web_events
WHERE date_format(timestamp, 'yyyy-MM-dd') = '2023-06-17'
""").show(truncate=False)
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan
|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Physical Plan ==\n*(1) Project [event_type#126, user_id#128L,
event_id#125L]\n+- *(1) Filter (isnotnull(timestamp#127) AND
(date_format(cast(timestamp#127 as timestamp), yyyy-MM-dd,
Some(America/New_York)) = 2023-06-17))\n +- *(1) ColumnarToRow\n +-
FileScan parquet
spark_catalog.default.web_events[event_id#125L,event_type#126,timestamp#127,user_id#128L]
Batched: true, DataFilters: [isnotnull(timestamp#127),
(date_format(cast(timestamp#127 as timestamp), yyyy-MM-dd, Some(Americ...,
Format: Parquet, Location: HoodieFileIndex(1
paths)[file:/Users/soumilshah/Desktop/web_events], PartitionFilters: [],
PushedFilters: [IsNotNull(timestamp)], ReadSchema:
struct<event_id:bigint,event_type:string,timestamp:string,user_id:bigint>\n\n|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
```
spark.sql("""
EXPLAIN
SELECT hour(timestamp) AS hour_of_day, COUNT(*) AS event_count
FROM web_events
GROUP BY hour(timestamp)
""").show(truncate=False)
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan
|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Physical Plan ==\nAdaptiveSparkPlan isFinalPlan=false\n+-
HashAggregate(keys=[_groupingexpression#242], functions=[count(1)])\n +-
Exchange hashpartitioning(_groupingexpression#242, 200), ENSURE_REQUIREMENTS,
[plan_id=240]\n +- HashAggregate(keys=[_groupingexpression#242],
functions=[partial_count(1)])\n +- Project [hour(cast(timestamp#217 as
timestamp), Some(America/New_York)) AS _groupingexpression#242]\n +-
FileScan parquet spark_catalog.default.web_events[timestamp#217] Batched: true,
DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1
paths)[file:/Users/soumilshah/Desktop/web_events], PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<timestamp:string>\n\n|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
If someone could help me understand how to verify that functional indexes
are working, I would greatly appreciate it. I'm not an expert in Spark UI, so
any guidance or pointers would be very helpful.
@ad1happy2go
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]