bvaradar commented on issue #1226: [HUDI-238] Make Hudi support Scala 2.12 URL: https://github.com/apache/incubator-hudi/pull/1226#issuecomment-575802938 This is the same set of tests with spark-2.4.4, scala_2.11 and hudi (base) varadarb-C02SH0P1G8WL:zhedoubushishi_hudi varadarb$ ~/spark-2.4.4-bin-hadoop2.7/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-SNAPSHOT,org.apache.spark:spark-avro_2.11:2.4.2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' Ivy Default Cache set to: /Users/varadarb/.ivy2/cache The jars for the packages stored in: /Users/varadarb/.ivy2/jars :: loading settings :: url = jar:file:/Users/varadarb/spark-2.4.4-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.hudi#hudi-spark-bundle_2.11 added as a dependency org.apache.spark#spark-avro_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-db5734f8-665a-44e6-a55c-d389e9c9b0d4;1.0 confs: [default] found org.apache.hudi#hudi-spark-bundle_2.11;0.5.1-SNAPSHOT in local-m2-cache found org.apache.spark#spark-avro_2.11;2.4.2 in central found org.spark-project.spark#unused;1.0.0 in spark-list :: resolution report :: resolve 233ms :: artifacts dl 6ms :: modules in use: org.apache.hudi#hudi-spark-bundle_2.11;0.5.1-SNAPSHOT from local-m2-cache in [default] org.apache.spark#spark-avro_2.11;2.4.2 from central in [default] org.spark-project.spark#unused;1.0.0 from spark-list in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 3 | 0 | 0 | 0 || 3 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-db5734f8-665a-44e6-a55c-d389e9c9b0d4 confs: [default] 0 artifacts copied, 3 already retrieved (0kB/5ms) 20/01/17 13:30:05 WARN Utils: Your hostname, varadarb-C02SH0P1G8WL resolves to a loopback address: 127.0.0.1; using 172.26.16.136 instead (on interface en0) 20/01/17 13:30:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 20/01/17 13:30:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://varadarb-c02sh0p1g8wl.corp.uber.com:4040 Spark context available as 'sc' (master = local[*], app id = local-1579296611401). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191) Type in expressions to have them evaluated. Type :help for more information. scala> import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.QuickstartUtils._ scala> import scala.collection.JavaConversions._ import scala.collection.JavaConversions._ scala> import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SaveMode._ scala> import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceReadOptions._ scala> import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._ scala> import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.config.HoodieWriteConfig._ scala> scala> val tableName = "hudi_cow_table" tableName: String = hudi_cow_table scala> val basePath = "file:///tmp/hudi_cow_table" basePath: String = file:///tmp/hudi_cow_table scala> val dataGen = new DataGenerator dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@22c4151b scala> val inserts = convertToStringList(dataGen.generateInserts(10)) inserts: java.util.List[String] = [{"ts": 0.0, "uuid": "d41c911e-41b5-4c54-ac4f-492623c5a8fe", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": "c59f096d-ec05-4a18-aac4-67866c007925", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": "6e340dc7-0992-454c-a069-816b8de93a22", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.5731835407930634, "begin_... scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) warning: there was one deprecation warning; re-run with -deprecation for details df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] scala> df.write.format("org.apache.hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Overwrite). | save(basePath); scala> val roViewDF = spark. | read. | format("org.apache.hudi"). | load(basePath + "/*/*/*/*") roViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> roViewDF.createOrReplaceTempView("hudi_ro_table") scala> spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show() 20/01/17 13:30:53 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 20/01/17 13:30:53 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException +------------------+-------------------+-------------------+---+ | fare| begin_lon| begin_lat| ts| +------------------+-------------------+-------------------+---+ | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0| | 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| | 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0| | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0| | 43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0| | 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| |34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0| | 41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0| +------------------+-------------------+-------------------+---+ scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show() +-------------------+--------------------+----------------------+---------+----------+------------------+ |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare| +-------------------+--------------------+----------------------+---------+----------+------------------+ | 20200117133031|6e340dc7-0992-454...| americas/united_s...|rider-213|driver-213| 64.27696295884016| | 20200117133031|be18b898-c04c-4bf...| americas/united_s...|rider-213|driver-213| 93.56018115236618| | 20200117133031|73ba1d25-2cc5-407...| americas/united_s...|rider-213|driver-213| 27.79478688582596| | 20200117133031|82dbd63c-a6b6-4f7...| americas/united_s...|rider-213|driver-213| 33.92216483948643| | 20200117133031|da517676-c854-495...| americas/united_s...|rider-213|driver-213|19.179139106643607| | 20200117133031|c59f096d-ec05-4a1...| americas/brazil/s...|rider-213|driver-213| 43.4923811219014| | 20200117133031|cd150bb8-7f29-4e5...| americas/brazil/s...|rider-213|driver-213| 66.62084366450246| | 20200117133031|d41c911e-41b5-4c5...| americas/brazil/s...|rider-213|driver-213|34.158284716382845| | 20200117133031|31a5958c-2fac-4bf...| asia/india/chennai|rider-213|driver-213|17.851135255091155| | 20200117133031|95cbb4b3-118c-4d6...| asia/india/chennai|rider-213|driver-213| 41.06290929046368| +-------------------+--------------------+----------------------+---------+----------+------------------+ scala> val updates = convertToStringList(dataGen.generateUpdates(10)) updates: java.util.List[String] = [{"ts": 0.0, "uuid": "6e340dc7-0992-454c-a069-816b8de93a22", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.7340133901254792, "begin_lon": 0.5142184937933181, "end_lat": 0.7814655558162802, "end_lon": 0.6592596683641996, "fare": 49.527694252432056, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 0.0, "uuid": "d41c911e-41b5-4c54-ac4f-492623c5a8fe", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.1593867607188556, "begin_lon": 0.010872312870502165, "end_lat": 0.9808530350038475, "end_lon": 0.7963756520507014, "fare": 29.47661370147079, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": "d41c911e-41b5-4c54-ac4f-492623c5a8fe", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.71801964677... scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)); warning: there was one deprecation warning; re-run with -deprecation for details df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] scala> df.write.format("org.apache.hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Append). | save(basePath); scala> // reload data scala> spark. | read. | format("org.apache.hudi"). | load(basePath + "/*/*/*/*"). | createOrReplaceTempView("hudi_ro_table") scala> scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) commits: Array[String] = Array(20200117133031, 20200117133058) scala> val beginTime = commits(commits.length - 2) // commit time we are interested in beginTime: String = 20200117133031 scala> scala> // incrementally query data scala> val incViewDF = spark. | read. | format("org.apache.hudi"). | option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | load(basePath); incViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> incViewDF.registerTempTable("hudi_incr_table") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() +-------------------+------------------+--------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+--------------------+-------------------+---+ | 20200117133058|49.527694252432056| 0.5142184937933181| 0.7340133901254792|0.0| | 20200117133058| 98.3428192817987| 0.3349917833248327| 0.4777395067707303|0.0| | 20200117133058| 90.9053809533154| 0.19949323322922063|0.18294079059016366|0.0| | 20200117133058| 90.25710109008239| 0.4006983139989222|0.08528650347654165|0.0| | 20200117133058| 63.72504913279929| 0.888493603696927| 0.6570857443423376|0.0| | 20200117133058| 29.47661370147079|0.010872312870502165| 0.1593867607188556|0.0| +-------------------+------------------+--------------------+-------------------+---+ scala> val beginTime = "000" // Represents all commits > this time. beginTime: String = 000 scala> val endTime = commits(commits.length - 2) // commit time we are interested in endTime: String = 20200117133031 scala> scala> //incrementally query data scala> val incViewDF = spark.read.format("org.apache.hudi"). | option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | option(END_INSTANTTIME_OPT_KEY, endTime). | load(basePath); incViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> incViewDF.registerTempTable("hudi_incr_table") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() +-------------------+------------------+-------------------+-------------------+---+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+-------------------+-------------------+---+ | 20200117133031| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0| | 20200117133031| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0| | 20200117133031| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0| | 20200117133031| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0| | 20200117133031| 43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0| | 20200117133031| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0| | 20200117133031|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0| | 20200117133031| 41.06290929046368| 0.8192868687714224| 0.651058505660742|0.0| +-------------------+------------------+-------------------+-------------------+---+ scala>
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
