bvaradar commented on issue #1226: [HUDI-238] Make Hudi support Scala 2.12
URL: https://github.com/apache/incubator-hudi/pull/1226#issuecomment-575785278
 
 
   I tested @zhedoubushishi changes by building with scala 2.12  and ran hudi 
quickstart with the hudi packages and spark-3.0-preview2 (which is built with 
2.12). It ran successfully !!
   
   (base) varadarb-C02SH0P1G8WL:zhedoubushishi_hudi varadarb$ 
~/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-shell --packages 
org.apache.hudi:hudi-spark-bundle_2.12:0.5.1-SNAPSHOT,org.apache.spark:spark-avro_2.12:3.0.0-preview2
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
   20/01/17 12:22:40 WARN Utils: Your hostname, varadarb-C02SH0P1G8WL resolves 
to a loopback address: 127.0.0.1; using 172.26.16.136 instead (on interface en0)
   20/01/17 12:22:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
another address
   Ivy Default Cache set to: /Users/varadarb/.ivy2/cache
   The jars for the packages stored in: /Users/varadarb/.ivy2/jars
   :: loading settings :: url = 
jar:file:/Users/varadarb/spark-3.0.0-preview2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
   org.apache.hudi#hudi-spark-bundle_2.12 added as a dependency
   org.apache.spark#spark-avro_2.12 added as a dependency
   :: resolving dependencies :: 
org.apache.spark#spark-submit-parent-e8c0c949-bada-47b9-a444-5b32205e1ff3;1.0
        confs: [default]
        found org.apache.hudi#hudi-spark-bundle_2.12;0.5.1-SNAPSHOT in 
local-m2-cache
        found org.apache.spark#spark-avro_2.12;3.0.0-preview2 in central
        found org.spark-project.spark#unused;1.0.0 in spark-list
   downloading 
https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.0.0-preview2/spark-avro_2.12-3.0.0-preview2.jar
 ...
        [SUCCESSFUL ] 
org.apache.spark#spark-avro_2.12;3.0.0-preview2!spark-avro_2.12.jar (44ms)
   :: resolution report :: resolve 1173ms :: artifacts dl 48ms
        :: modules in use:
        org.apache.hudi#hudi-spark-bundle_2.12;0.5.1-SNAPSHOT from 
local-m2-cache in [default]
        org.apache.spark#spark-avro_2.12;3.0.0-preview2 from central in 
[default]
        org.spark-project.spark#unused;1.0.0 from spark-list in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   3   |   1   |   1   |   0   ||   3   |   1   |
        ---------------------------------------------------------------------
   
   :: problems summary ::
   :::: ERRORS
        unknown resolver null
   
   
   :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
   :: retrieving :: 
org.apache.spark#spark-submit-parent-e8c0c949-bada-47b9-a444-5b32205e1ff3
        confs: [default]
        1 artifacts copied, 2 already retrieved (146kB/6ms)
   20/01/17 12:22:42 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties
   Setting default log level to "WARN".
   To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
   Spark context Web UI available at 
http://varadarb-c02sh0p1g8wl.corp.uber.com:4040
   Spark context available as 'sc' (master = local[*], app id = 
local-1579292567995).
   Spark session available as 'spark'.
   Welcome to
         ____              __
        / __/__  ___ _____/ /__
       _\ \/ _ \/ _ `/ __/  '_/
      /___/ .__/\_,_/_/ /_/\_\   version 3.0.0-preview2
         /_/
            
   Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 
1.8.0_191)
   Type in expressions to have them evaluated.
   Type :help for more information.
   
   scala> import org.apache.hudi.QuickstartUtils._
   import org.apache.hudi.QuickstartUtils._
   
   scala> import scala.collection.JavaConversions._
   import scala.collection.JavaConversions._
   
   scala> import org.apache.spark.sql.SaveMode._
   import org.apache.spark.sql.SaveMode._
   
   scala> import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceReadOptions._
   
   scala> import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   
   scala> import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.config.HoodieWriteConfig._
   
   scala> 
   
   scala> val tableName = "hudi_cow_table"
   tableName: String = hudi_cow_table
   
   scala> val basePath = "file:///tmp/hudi_cow_table"
   basePath: String = file:///tmp/hudi_cow_table
   
   scala> val dataGen = new DataGenerator
   dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = 
org.apache.hudi.QuickstartUtils$DataGenerator@1c00809b
   
   scala> val inserts = convertToStringList(dataGen.generateInserts(10))
   inserts: java.util.List[String] = [{"ts": 0.0, "uuid": 
"44e96c81-a406-4429-a56e-d6f6121a1182", "rider": "rider-213", "driver": 
"driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 
0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 
0.9671159942018241, "fare": 34.158284716382845, "partitionpath": 
"americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": 
"4e31e5af-348a-4b39-ada5-e20213e8c307", "rider": "rider-213", "driver": 
"driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, 
"end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 
43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 0.0, 
"uuid": "6f24f083-eff9-4328-b1ee-a54889cfd6e2", "rider": "rider-213", "driver": 
"driver-213", "begin_lat": 0...
   
   scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   warning: there was one deprecation warning (since 2.12.0)
   warning: there was one deprecation warning (since 2.2.0)
   warning: there were two deprecation warnings in total; for details, enable 
`:setting -deprecation' or `:replay -deprecation'
   df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double 
... 8 more fields]
   
   scala> df.write.format("org.apache.hudi").
        |     options(getQuickstartWriteConfigs).
        |     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
        |     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
        |     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
        |     option(TABLE_NAME, tableName).
        |     mode(Overwrite).
        |     save(basePath);
                                                                                
   
   scala> val roViewDF = spark.
        |     read.
        |     format("org.apache.hudi").
        |     load(basePath + "/*/*/*/*")
   roViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 13 more fields]
   
   scala> roViewDF.createOrReplaceTempView("hudi_ro_table")
   
   scala> spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_ro_table 
where fare > 20.0").show()
   +------------------+-------------------+-------------------+---+
   |              fare|          begin_lon|          begin_lat| ts|
   +------------------+-------------------+-------------------+---+
   | 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
   | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0|
   | 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0|
   | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0|
   | 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0|
   |  43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0|
   |34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0|
   | 41.06290929046368| 0.8192868687714224|  0.651058505660742|0.0|
   +------------------+-------------------+-------------------+---+
   
   
   scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, 
_hoodie_partition_path, rider, driver, fare from  hudi_ro_table").show()
   
+-------------------+--------------------+----------------------+---------+----------+------------------+
   |_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|  
  driver|              fare|
   
+-------------------+--------------------+----------------------+---------+----------+------------------+
   |     20200117122356|91e41b3e-1c25-403...|  
americas/united_s...|rider-213|driver-213|19.179139106643607|
   |     20200117122356|17ee8945-22d2-496...|  
americas/united_s...|rider-213|driver-213| 93.56018115236618|
   |     20200117122356|129345e8-a885-428...|  
americas/united_s...|rider-213|driver-213| 33.92216483948643|
   |     20200117122356|420f2771-23b3-4df...|  
americas/united_s...|rider-213|driver-213| 27.79478688582596|
   |     20200117122356|6f24f083-eff9-432...|  
americas/united_s...|rider-213|driver-213| 64.27696295884016|
   |     20200117122356|c2a0969f-3381-492...|  
americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
   |     20200117122356|4e31e5af-348a-4b3...|  
americas/brazil/s...|rider-213|driver-213|  43.4923811219014|
   |     20200117122356|44e96c81-a406-442...|  
americas/brazil/s...|rider-213|driver-213|34.158284716382845|
   |     20200117122356|8cd77166-8a33-47c...|    
asia/india/chennai|rider-213|driver-213| 41.06290929046368|
   |     20200117122356|7efe1602-e219-419...|    
asia/india/chennai|rider-213|driver-213|17.851135255091155|
   
+-------------------+--------------------+----------------------+---------+----------+------------------+
   
   
   scala> val updates = convertToStringList(dataGen.generateUpdates(10))
   updates: java.util.List[String] = [{"ts": 0.0, "uuid": 
"6f24f083-eff9-4328-b1ee-a54889cfd6e2", "rider": "rider-284", "driver": 
"driver-284", "begin_lat": 0.7340133901254792, "begin_lon": 0.5142184937933181, 
"end_lat": 0.7814655558162802, "end_lon": 0.6592596683641996, "fare": 
49.527694252432056, "partitionpath": "americas/united_states/san_francisco"}, 
{"ts": 0.0, "uuid": "44e96c81-a406-4429-a56e-d6f6121a1182", "rider": 
"rider-284", "driver": "driver-284", "begin_lat": 0.1593867607188556, 
"begin_lon": 0.010872312870502165, "end_lat": 0.9808530350038475, "end_lon": 
0.7963756520507014, "fare": 29.47661370147079, "partitionpath": 
"americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": 
"44e96c81-a406-4429-a56e-d6f6121a1182", "rider": "rider-284", "driver": 
"driver-284", ...
   
   scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2));
   warning: there was one deprecation warning (since 2.12.0)
   warning: there was one deprecation warning (since 2.2.0)
   warning: there were two deprecation warnings in total; for details, enable 
`:setting -deprecation' or `:replay -deprecation'
   df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double 
... 8 more fields]
   
   scala> df.write.format("org.apache.hudi").
        |     options(getQuickstartWriteConfigs).
        |     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
        |     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
        |     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
        |     option(TABLE_NAME, tableName).
        |     mode(Append).
        |     save(basePath);
                                                                                
   
   scala> spark.
        |     read.
        |     format("org.apache.hudi").
        |     load(basePath + "/*/*/*/*").
        |     createOrReplaceTempView("hudi_ro_table")
   
   scala> 
   
   scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as 
commitTime from  hudi_ro_table order by commitTime").map(k => 
k.getString(0)).take(50)
   commits: Array[String] = Array(20200117122356, 20200117122430)               
   
   
   scala> val beginTime = commits(commits.length - 2) // commit time we are 
interested in
   beginTime: String = 20200117122356
   
   scala> 
   
   scala> // incrementally query data
   
   scala> val incViewDF = spark.
        |     read.
        |     format("org.apache.hudi").
        |     option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
        |     option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
        |     load(basePath);
   incViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 13 more fields]
   
   scala> incViewDF.createOrReplaceTempView("hudi_incr_table")
   
   scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, 
ts from  hudi_incr_table where fare > 20.0").show()
   
+-------------------+------------------+--------------------+-------------------+---+
   |_hoodie_commit_time|              fare|           begin_lon|          
begin_lat| ts|
   
+-------------------+------------------+--------------------+-------------------+---+
   |     20200117122430|  90.9053809533154| 
0.19949323322922063|0.18294079059016366|0.0|
   |     20200117122430|  98.3428192817987|  0.3349917833248327| 
0.4777395067707303|0.0|
   |     20200117122430|49.527694252432056|  0.5142184937933181| 
0.7340133901254792|0.0|
   |     20200117122430| 90.25710109008239|  
0.4006983139989222|0.08528650347654165|0.0|
   |     20200117122430| 63.72504913279929|   0.888493603696927| 
0.6570857443423376|0.0|
   |     20200117122430| 29.47661370147079|0.010872312870502165| 
0.1593867607188556|0.0|
   
+-------------------+------------------+--------------------+-------------------+---+
   
   
   scala> val beginTime = "000" // Represents all commits > this time.
   beginTime: String = 000
   
   scala> val endTime = commits(commits.length - 2) // commit time we are 
interested in
   endTime: String = 20200117122356
   
   scala> 
   
   scala> //incrementally query data
   
   scala> val incViewDF = spark.read.format("org.apache.hudi").
        |     option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
        |     option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
        |     option(END_INSTANTTIME_OPT_KEY, endTime).
        |     load(basePath);
   incViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 13 more fields]
   
   scala> incViewDF.createOrReplaceTempView("hudi_incr_table")
   
   scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, 
ts from  hudi_incr_table where fare > 20.0").show()
   
+-------------------+------------------+-------------------+-------------------+---+
   |_hoodie_commit_time|              fare|          begin_lon|          
begin_lat| ts|
   
+-------------------+------------------+-------------------+-------------------+---+
   |     20200117122356| 
93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
   |     20200117122356| 33.92216483948643| 0.9694586417848392| 
0.1856488085068272|0.0|
   |     20200117122356| 27.79478688582596| 
0.6273212202489661|0.11488393157088261|0.0|
   |     20200117122356| 64.27696295884016| 0.4923479652912024| 
0.5731835407930634|0.0|
   |     20200117122356| 66.62084366450246|0.03844104444445928| 
0.0750588760043035|0.0|
   |     20200117122356|  43.4923811219014| 0.8779402295427752| 
0.6100070562136587|0.0|
   |     20200117122356|34.158284716382845|0.46157858450465483| 
0.4726905879569653|0.0|
   |     20200117122356| 41.06290929046368| 0.8192868687714224|  
0.651058505660742|0.0|
   
+-------------------+------------------+-------------------+-------------------+---+
   
   
   scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, 
ts from  hudi_incr_table where fare > 20.0").show()
   
+-------------------+------------------+-------------------+-------------------+---+
   |_hoodie_commit_time|              fare|          begin_lon|          
begin_lat| ts|
   
+-------------------+------------------+-------------------+-------------------+---+
   |     20200117122356| 
93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
   |     20200117122356| 33.92216483948643| 0.9694586417848392| 
0.1856488085068272|0.0|
   |     20200117122356| 27.79478688582596| 
0.6273212202489661|0.11488393157088261|0.0|
   |     20200117122356| 64.27696295884016| 0.4923479652912024| 
0.5731835407930634|0.0|
   |     20200117122356| 66.62084366450246|0.03844104444445928| 
0.0750588760043035|0.0|
   |     20200117122356|  43.4923811219014| 0.8779402295427752| 
0.6100070562136587|0.0|
   |     20200117122356|34.158284716382845|0.46157858450465483| 
0.4726905879569653|0.0|
   |     20200117122356| 41.06290929046368| 0.8192868687714224|  
0.651058505660742|0.0|
   
+-------------------+------------------+-------------------+-------------------+---+
   
   
   scala> 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to