rangareddy commented on code in PR #18939: URL: https://github.com/apache/hudi/pull/18939#discussion_r3488264318
########## hudi-notebooks/notebooks/spark4/utils.py: ########## @@ -0,0 +1,262 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Spark 4 notebook helpers. Copied into the notebook home as utils.py when +# building the apachehudi/spark4-hudi (Spark 4) image. + +import os +import urllib.request + +from pyspark.sql import SparkSession +from IPython.display import HTML, display as display_html + +_spark = None + +# Default number of rows to show in display() +DEFAULT_DISPLAY_ROWS = 100 + +# Reused HTML/CSS for DataFrame display (avoids string rebuild on every call) +_DISPLAY_TABLE_CSS = """ +<style> + .dataframe { + border-radius: 0.5rem; + box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); + overflow-x: auto; + border: 1px solid #e2e8f0; + } + .dataframe th { + background-color: #f1f5f9; + color: #1f2937; + font-weight: 600; + padding: 0.75rem 1.5rem; + text-align: left; + border-bottom: 2px solid #e2e8f0; + } + .dataframe td { + padding: 0.75rem 1.5rem; + border-bottom: 1px solid #e2e8f0; + } + .dataframe tr:nth-child(even) { + background-color: #f8fafc; + } + .dataframe tr:hover { + background-color: #e2e8f0; + transition: background-color 0.2s ease-in-out; + } +</style> +""" + + +def get_spark_session( + app_name="Hudi-Notebooks", + log_level="WARN", + hudi_version=None, +): + """ + Initialize a SparkSession (singleton). + + Connects to the in-container Spark standalone master started by entrypoint.sh. + The master URL, driver memory (4g) and executor memory (4g) are configured in + $SPARK_HOME/conf/spark-defaults.conf. + + Parameters: + - app_name (str): Optional name for the Spark application. + - log_level (str): Log level for Spark (DEBUG, INFO, WARN, ERROR). Defaults to WARN. + - hudi_version (str): Hudi bundle version. Defaults to the HUDI_VERSION baked into + the image (1.1.1 on the Spark 4 image). + + Returns: + - SparkSession object + """ + global _spark + + if _spark is not None: + return _spark + + if hudi_version is None: + hudi_version = os.getenv("HUDI_VERSION", "1.1.1") + + hudi_home = os.getenv("HUDI_HOME", "/opt/hudi") + spark_version = os.getenv("SPARK_VERSION", "4.0.2") + spark_minor_version = ".".join(spark_version.split(".")[:2]) + scala_version = os.getenv("SCALA_VERSION", "2.13") + bundle_name = f"hudi-spark{spark_minor_version}-bundle_{scala_version}" + bundle_jar = f"{bundle_name}-{hudi_version}.jar" + + # Resolve the Hudi bundle to a local path. The image pre-downloads it under + # $HUDI_HOME; if it is missing, fetch it from Maven Central now. + hudi_local_jar = os.path.join(hudi_home, hudi_version, bundle_jar) + if not os.path.exists(hudi_local_jar): + os.makedirs(os.path.dirname(hudi_local_jar), exist_ok=True) + hudi_jar_url = ( + f"https://repo1.maven.org/maven2/org/apache/hudi/" + f"{bundle_name}/{hudi_version}/{bundle_jar}" + ) + print(f"Hudi bundle not found at {hudi_local_jar}; downloading {hudi_jar_url} ...") + urllib.request.urlretrieve(hudi_jar_url, hudi_local_jar) + + lance_version = "0.5.0" + lance_home = "/opt/lance" + lance_jar = f"lance-spark-bundle-{spark_minor_version}_{scala_version}-{lance_version}.jar" + lance_local_jar = os.path.join(lance_home, lance_version, lance_jar) + if not os.path.exists(lance_local_jar): + os.makedirs(os.path.dirname(lance_local_jar), exist_ok=True) + lance_jar_url = ( + f"https://repo1.maven.org/maven2/org/lance/lance-spark-bundle-{spark_minor_version}_{scala_version}/{lance_version}/{lance_jar}" + ) + print(f"Lance bundle not found at {lance_local_jar}; downloading {lance_jar_url} ...") + urllib.request.urlretrieve(lance_jar_url, lance_local_jar) + + extraclasspath = f"{hudi_local_jar}:{lance_local_jar}" + _spark = ( + SparkSession.builder.appName(app_name) + .config("spark.driver.extraClassPath", extraclasspath) + .config("spark.executor.extraClassPath", extraclasspath) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + .enableHiveSupport() + .getOrCreate() + ) + + _spark.sparkContext.setLogLevel(log_level) + print( + f"SparkSession started with app name: {app_name}, " + f"Spark version: {spark_version}, Hudi version: {hudi_version}" + ) + + return _spark + + +def stop_spark_session(): + """Stop the global SparkSession and clear the singleton.""" + global _spark + if _spark is not None: + _spark.stop() + _spark = None + print("SparkSession stopped successfully.") + + +def ls(base_path): + """ + List files or directories at the given MinIO S3 path. + + Args: + base_path: Path starting with 's3a://' (e.g. s3a://warehouse/hudi_table/). + """ + #if not base_path.startswith("s3a://"): + # raise ValueError("Path must start with 's3a://'") + + global _spark + if _spark is None: + raise RuntimeError("SparkSession not initialized. Call get_spark_session() first.") + + try: + hadoop_conf = _spark._jsc.hadoopConfiguration() + fs = _spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf) + p = _spark._jvm.org.apache.hadoop.fs.Path(base_path) + if not fs.exists(p): + print(f"Path does not exist: {base_path}") + return [] + status = fs.listStatus(p) + files = [str(file.getPath()) for file in status] + for f in files: + print(f) + except Exception as e: + print(f"Exception occurred while listing files from path {base_path}", e) + + +def drop_table(table_name: str = None, table_path: str = None): + try: + _spark.sql(f"DROP TABLE IF EXISTS {table_name}") + print(f"✓ Table '{table_name}' dropped successfully.") Review Comment: Fixed this review comment in latest pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
