bvaradar commented on issue #1226: [HUDI-238] Make Hudi support Scala 2.12 URL: https://github.com/apache/incubator-hudi/pull/1226#issuecomment-575785278 I tested @zhedoubushishi changes by building with scala 2.12 and ran hudi quickstart with the hudi packages and spark-3.0-preview2 (which is built with 2.12). It ran successfully !! (base) varadarb-C02SH0P1G8WL:zhedoubushishi_hudi varadarb$ ~/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle_2.12:0.5.1-SNAPSHOT,org.apache.spark:spark-avro_2.12:3.0.0-preview2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' 20/01/17 12:22:40 WARN Utils: Your hostname, varadarb-C02SH0P1G8WL resolves to a loopback address: 127.0.0.1; using 172.26.16.136 instead (on interface en0) 20/01/17 12:22:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Ivy Default Cache set to: /Users/varadarb/.ivy2/cache The jars for the packages stored in: /Users/varadarb/.ivy2/jars :: loading settings :: url = jar:file:/Users/varadarb/spark-3.0.0-preview2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.hudi#hudi-spark-bundle_2.12 added as a dependency org.apache.spark#spark-avro_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-e8c0c949-bada-47b9-a444-5b32205e1ff3;1.0 confs: [default] found org.apache.hudi#hudi-spark-bundle_2.12;0.5.1-SNAPSHOT in local-m2-cache found org.apache.spark#spark-avro_2.12;3.0.0-preview2 in central found org.spark-project.spark#unused;1.0.0 in spark-list downloading https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.0.0-preview2/spark-avro_2.12-3.0.0-preview2.jar ... [SUCCESSFUL ] org.apache.spark#spark-avro_2.12;3.0.0-preview2!spark-avro_2.12.jar (44ms) :: resolution report :: resolve 1173ms :: artifacts dl 48ms :: modules in use: org.apache.hudi#hudi-spark-bundle_2.12;0.5.1-SNAPSHOT from local-m2-cache in [default] org.apache.spark#spark-avro_2.12;3.0.0-preview2 from central in [default] org.spark-project.spark#unused;1.0.0 from spark-list in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 3 | 1 | 1 | 0 || 3 | 1 | --------------------------------------------------------------------- :: problems summary :: :::: ERRORS unknown resolver null :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS :: retrieving :: org.apache.spark#spark-submit-parent-e8c0c949-bada-47b9-a444-5b32205e1ff3 confs: [default] 1 artifacts copied, 2 already retrieved (146kB/6ms) 20/01/17 12:22:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://varadarb-c02sh0p1g8wl.corp.uber.com:4040 Spark context available as 'sc' (master = local[*], app id = local-1579292567995). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0-preview2 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191) Type in expressions to have them evaluated. Type :help for more information. scala> import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.QuickstartUtils._ scala> import scala.collection.JavaConversions._ import scala.collection.JavaConversions._ scala> import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SaveMode._ scala> import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceReadOptions._ scala> import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._ scala> import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.config.HoodieWriteConfig._ scala> scala> val tableName = "hudi_cow_table" tableName: String = hudi_cow_table scala> val basePath = "file:///tmp/hudi_cow_table" basePath: String = file:///tmp/hudi_cow_table scala> val dataGen = new DataGenerator dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@1c00809b scala> val inserts = convertToStringList(dataGen.generateInserts(10)) inserts: java.util.List[String] = [{"ts": 0.0, "uuid": "44e96c81-a406-4429-a56e-d6f6121a1182", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": "4e31e5af-348a-4b39-ada5-e20213e8c307", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": "6f24f083-eff9-4328-b1ee-a54889cfd6e2", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0... scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) warning: there was one deprecation warning (since 2.12.0) warning: there was one deprecation warning (since 2.2.0) warning: there were two deprecation warnings in total; for details, enable `:setting -deprecation' or `:replay -deprecation' df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] scala> df.write.format("org.apache.hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Overwrite). | save(basePath); scala> val roViewDF = spark. | read. | format("org.apache.hudi"). | load(basePath + "/*/*/*/*") roViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> roViewDF.createOrReplaceTempView("hudi_ro_table") scala> spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show() +------------------+-------------------+-------------------+---+ | fare| begin_lon| begin_lat| ts| +------------------+-------------------+-------------------+---+ | 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0| | 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0| | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0| | 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| | 43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0| |34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0| | 41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0| +------------------+-------------------+-------------------+---+ scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show() +-------------------+--------------------+----------------------+---------+----------+------------------+ |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare| +-------------------+--------------------+----------------------+---------+----------+------------------+ | 20200117122356|91e41b3e-1c25-403...| americas/united_s...|rider-213|driver-213|19.179139106643607| | 20200117122356|17ee8945-22d2-496...| americas/united_s...|rider-213|driver-213| 93.56018115236618| | 20200117122356|129345e8-a885-428...| americas/united_s...|rider-213|driver-213| 33.92216483948643| | 20200117122356|420f2771-23b3-4df...| americas/united_s...|rider-213|driver-213| 27.79478688582596| | 20200117122356|6f24f083-eff9-432...| americas/united_s...|rider-213|driver-213| 64.27696295884016| | 20200117122356|c2a0969f-3381-492...| americas/brazil/s...|rider-213|driver-213| 66.62084366450246| | 20200117122356|4e31e5af-348a-4b3...| americas/brazil/s...|rider-213|driver-213| 43.4923811219014| | 20200117122356|44e96c81-a406-442...| americas/brazil/s...|rider-213|driver-213|34.158284716382845| | 20200117122356|8cd77166-8a33-47c...| asia/india/chennai|rider-213|driver-213| 41.06290929046368| | 20200117122356|7efe1602-e219-419...| asia/india/chennai|rider-213|driver-213|17.851135255091155| +-------------------+--------------------+----------------------+---------+----------+------------------+ scala> val updates = convertToStringList(dataGen.generateUpdates(10)) updates: java.util.List[String] = [{"ts": 0.0, "uuid": "6f24f083-eff9-4328-b1ee-a54889cfd6e2", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.7340133901254792, "begin_lon": 0.5142184937933181, "end_lat": 0.7814655558162802, "end_lon": 0.6592596683641996, "fare": 49.527694252432056, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 0.0, "uuid": "44e96c81-a406-4429-a56e-d6f6121a1182", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.1593867607188556, "begin_lon": 0.010872312870502165, "end_lat": 0.9808530350038475, "end_lon": 0.7963756520507014, "fare": 29.47661370147079, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": "44e96c81-a406-4429-a56e-d6f6121a1182", "rider": "rider-284", "driver": "driver-284", ... scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)); warning: there was one deprecation warning (since 2.12.0) warning: there was one deprecation warning (since 2.2.0) warning: there were two deprecation warnings in total; for details, enable `:setting -deprecation' or `:replay -deprecation' df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] scala> df.write.format("org.apache.hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Append). | save(basePath); scala> spark. | read. | format("org.apache.hudi"). | load(basePath + "/*/*/*/*"). | createOrReplaceTempView("hudi_ro_table") scala> scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) commits: Array[String] = Array(20200117122356, 20200117122430) scala> val beginTime = commits(commits.length - 2) // commit time we are interested in beginTime: String = 20200117122356 scala> scala> // incrementally query data scala> val incViewDF = spark. | read. | format("org.apache.hudi"). | option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | load(basePath); incViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> incViewDF.createOrReplaceTempView("hudi_incr_table") scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() +-------------------+------------------+--------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+--------------------+-------------------+---+ | 20200117122430| 90.9053809533154| 0.19949323322922063|0.18294079059016366|0.0| | 20200117122430| 98.3428192817987| 0.3349917833248327| 0.4777395067707303|0.0| | 20200117122430|49.527694252432056| 0.5142184937933181| 0.7340133901254792|0.0| | 20200117122430| 90.25710109008239| 0.4006983139989222|0.08528650347654165|0.0| | 20200117122430| 63.72504913279929| 0.888493603696927| 0.6570857443423376|0.0| | 20200117122430| 29.47661370147079|0.010872312870502165| 0.1593867607188556|0.0| +-------------------+------------------+--------------------+-------------------+---+ scala> val beginTime = "000" // Represents all commits > this time. beginTime: String = 000 scala> val endTime = commits(commits.length - 2) // commit time we are interested in endTime: String = 20200117122356 scala> scala> //incrementally query data scala> val incViewDF = spark.read.format("org.apache.hudi"). | option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | option(END_INSTANTTIME_OPT_KEY, endTime). | load(basePath); incViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> incViewDF.createOrReplaceTempView("hudi_incr_table") scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() +-------------------+------------------+-------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+-------------------+-------------------+---+ | 20200117122356| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| | 20200117122356| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0| | 20200117122356| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0| | 20200117122356| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0| | 20200117122356| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| | 20200117122356| 43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0| | 20200117122356|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0| | 20200117122356| 41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0| +-------------------+------------------+-------------------+-------------------+---+ scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() +-------------------+------------------+-------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+-------------------+-------------------+---+ | 20200117122356| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| | 20200117122356| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0| | 20200117122356| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0| | 20200117122356| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0| | 20200117122356| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| | 20200117122356| 43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0| | 20200117122356|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0| | 20200117122356| 41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0| +-------------------+------------------+-------------------+-------------------+---+ scala>
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
