bvaradar commented on issue #1226: [HUDI-238] Make Hudi support Scala 2.12
URL: https://github.com/apache/incubator-hudi/pull/1226#issuecomment-575802938
 
 
   This is the same set of tests with spark-2.4.4, scala_2.11 and hudi
   
   (base) varadarb-C02SH0P1G8WL:zhedoubushishi_hudi varadarb$ 
~/spark-2.4.4-bin-hadoop2.7/bin/spark-shell --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-SNAPSHOT,org.apache.spark:spark-avro_2.11:2.4.2
 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
   Ivy Default Cache set to: /Users/varadarb/.ivy2/cache
   The jars for the packages stored in: /Users/varadarb/.ivy2/jars
   :: loading settings :: url = 
jar:file:/Users/varadarb/spark-2.4.4-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
   org.apache.hudi#hudi-spark-bundle_2.11 added as a dependency
   org.apache.spark#spark-avro_2.11 added as a dependency
   :: resolving dependencies :: 
org.apache.spark#spark-submit-parent-db5734f8-665a-44e6-a55c-d389e9c9b0d4;1.0
        confs: [default]
        found org.apache.hudi#hudi-spark-bundle_2.11;0.5.1-SNAPSHOT in 
local-m2-cache
        found org.apache.spark#spark-avro_2.11;2.4.2 in central
        found org.spark-project.spark#unused;1.0.0 in spark-list
   :: resolution report :: resolve 233ms :: artifacts dl 6ms
        :: modules in use:
        org.apache.hudi#hudi-spark-bundle_2.11;0.5.1-SNAPSHOT from 
local-m2-cache in [default]
        org.apache.spark#spark-avro_2.11;2.4.2 from central in [default]
        org.spark-project.spark#unused;1.0.0 from spark-list in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
        ---------------------------------------------------------------------
   :: retrieving :: 
org.apache.spark#spark-submit-parent-db5734f8-665a-44e6-a55c-d389e9c9b0d4
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/5ms)
   20/01/17 13:30:05 WARN Utils: Your hostname, varadarb-C02SH0P1G8WL resolves 
to a loopback address: 127.0.0.1; using 172.26.16.136 instead (on interface en0)
   20/01/17 13:30:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
another address
   20/01/17 13:30:06 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties
   Setting default log level to "WARN".
   To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
   Spark context Web UI available at 
http://varadarb-c02sh0p1g8wl.corp.uber.com:4040
   Spark context available as 'sc' (master = local[*], app id = 
local-1579296611401).
   Spark session available as 'spark'.
   Welcome to
         ____              __
        / __/__  ___ _____/ /__
       _\ \/ _ \/ _ `/ __/  '_/
      /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
         /_/
            
   Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 
1.8.0_191)
   Type in expressions to have them evaluated.
   Type :help for more information.
   
   scala> import org.apache.hudi.QuickstartUtils._
   import org.apache.hudi.QuickstartUtils._
   
   scala> import scala.collection.JavaConversions._
   import scala.collection.JavaConversions._
   
   scala> import org.apache.spark.sql.SaveMode._
   import org.apache.spark.sql.SaveMode._
   
   scala> import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceReadOptions._
   
   scala> import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   
   scala> import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.config.HoodieWriteConfig._
   
   scala> 
   
   scala> val tableName = "hudi_cow_table"
   tableName: String = hudi_cow_table
   
   scala> val basePath = "file:///tmp/hudi_cow_table"
   basePath: String = file:///tmp/hudi_cow_table
   
   scala> val dataGen = new DataGenerator
   dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = 
org.apache.hudi.QuickstartUtils$DataGenerator@22c4151b
   
   scala> val inserts = convertToStringList(dataGen.generateInserts(10))
   inserts: java.util.List[String] = [{"ts": 0.0, "uuid": 
"d41c911e-41b5-4c54-ac4f-492623c5a8fe", "rider": "rider-213", "driver": 
"driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 
0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 
0.9671159942018241, "fare": 34.158284716382845, "partitionpath": 
"americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": 
"c59f096d-ec05-4a18-aac4-67866c007925", "rider": "rider-213", "driver": 
"driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, 
"end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 
43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 0.0, 
"uuid": "6e340dc7-0992-454c-a069-816b8de93a22", "rider": "rider-213", "driver": 
"driver-213", "begin_lat": 0.5731835407930634, "begin_...
   scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   warning: there was one deprecation warning; re-run with -deprecation for 
details
   df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double 
... 8 more fields]
   
   scala> df.write.format("org.apache.hudi").
        |     options(getQuickstartWriteConfigs).
        |     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
        |     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
        |     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
        |     option(TABLE_NAME, tableName).
        |     mode(Overwrite).
        |     save(basePath);
                                                                                
   
   scala> val roViewDF = spark.
        |     read.
        |     format("org.apache.hudi").
        |     load(basePath + "/*/*/*/*")
   roViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 13 more fields]
   
   scala> roViewDF.createOrReplaceTempView("hudi_ro_table")
   
   scala> spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_ro_table 
where fare > 20.0").show()
   20/01/17 13:30:53 WARN ObjectStore: Version information not found in 
metastore. hive.metastore.schema.verification is not enabled so recording the 
schema version 1.2.0
   20/01/17 13:30:53 WARN ObjectStore: Failed to get database global_temp, 
returning NoSuchObjectException
   +------------------+-------------------+-------------------+---+
   |              fare|          begin_lon|          begin_lat| ts|
   +------------------+-------------------+-------------------+---+
   | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0|
   | 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
   | 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0|
   | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0|
   |  43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0|
   | 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0|
   |34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0|
   | 41.06290929046368| 0.8192868687714224|  0.651058505660742|0.0|
   +------------------+-------------------+-------------------+---+
   
   
   scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, 
_hoodie_partition_path, rider, driver, fare from  hudi_ro_table").show()
   
+-------------------+--------------------+----------------------+---------+----------+------------------+
   |_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|  
  driver|              fare|
   
+-------------------+--------------------+----------------------+---------+----------+------------------+
   |     20200117133031|6e340dc7-0992-454...|  
americas/united_s...|rider-213|driver-213| 64.27696295884016|
   |     20200117133031|be18b898-c04c-4bf...|  
americas/united_s...|rider-213|driver-213| 93.56018115236618|
   |     20200117133031|73ba1d25-2cc5-407...|  
americas/united_s...|rider-213|driver-213| 27.79478688582596|
   |     20200117133031|82dbd63c-a6b6-4f7...|  
americas/united_s...|rider-213|driver-213| 33.92216483948643|
   |     20200117133031|da517676-c854-495...|  
americas/united_s...|rider-213|driver-213|19.179139106643607|
   |     20200117133031|c59f096d-ec05-4a1...|  
americas/brazil/s...|rider-213|driver-213|  43.4923811219014|
   |     20200117133031|cd150bb8-7f29-4e5...|  
americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
   |     20200117133031|d41c911e-41b5-4c5...|  
americas/brazil/s...|rider-213|driver-213|34.158284716382845|
   |     20200117133031|31a5958c-2fac-4bf...|    
asia/india/chennai|rider-213|driver-213|17.851135255091155|
   |     20200117133031|95cbb4b3-118c-4d6...|    
asia/india/chennai|rider-213|driver-213| 41.06290929046368|
   
+-------------------+--------------------+----------------------+---------+----------+------------------+
   
   
   scala> val updates = convertToStringList(dataGen.generateUpdates(10))
   updates: java.util.List[String] = [{"ts": 0.0, "uuid": 
"6e340dc7-0992-454c-a069-816b8de93a22", "rider": "rider-284", "driver": 
"driver-284", "begin_lat": 0.7340133901254792, "begin_lon": 0.5142184937933181, 
"end_lat": 0.7814655558162802, "end_lon": 0.6592596683641996, "fare": 
49.527694252432056, "partitionpath": "americas/united_states/san_francisco"}, 
{"ts": 0.0, "uuid": "d41c911e-41b5-4c54-ac4f-492623c5a8fe", "rider": 
"rider-284", "driver": "driver-284", "begin_lat": 0.1593867607188556, 
"begin_lon": 0.010872312870502165, "end_lat": 0.9808530350038475, "end_lon": 
0.7963756520507014, "fare": 29.47661370147079, "partitionpath": 
"americas/brazil/sao_paulo"}, {"ts": 0.0, "uuid": 
"d41c911e-41b5-4c54-ac4f-492623c5a8fe", "rider": "rider-284", "driver": 
"driver-284", "begin_lat": 0.71801964677...
   scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2));
   warning: there was one deprecation warning; re-run with -deprecation for 
details
   df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double 
... 8 more fields]
   
   scala> df.write.format("org.apache.hudi").
        |     options(getQuickstartWriteConfigs).
        |     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
        |     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
        |     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
        |     option(TABLE_NAME, tableName).
        |     mode(Append).
        |     save(basePath);
                                                                                
   
   scala> // reload data
   
   scala> spark.
        |     read.
        |     format("org.apache.hudi").
        |     load(basePath + "/*/*/*/*").
        |     createOrReplaceTempView("hudi_ro_table")
   
   scala> 
   
   scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as 
commitTime from  hudi_ro_table order by commitTime").map(k => 
k.getString(0)).take(50)
   commits: Array[String] = Array(20200117133031, 20200117133058)
   
   scala> val beginTime = commits(commits.length - 2) // commit time we are 
interested in
   beginTime: String = 20200117133031
   
   scala> 
   
   scala> // incrementally query data
   
   scala> val incViewDF = spark.
        |     read.
        |     format("org.apache.hudi").
        |     option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
        |     option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
        |     load(basePath);
   incViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 13 more fields]
   
   scala> incViewDF.registerTempTable("hudi_incr_table")
   warning: there was one deprecation warning; re-run with -deprecation for 
details
   
   scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, 
ts from  hudi_incr_table where fare > 20.0").show()
   
+-------------------+------------------+--------------------+-------------------+---+
   |_hoodie_commit_time|              fare|           begin_lon|          
begin_lat| ts|
   
+-------------------+------------------+--------------------+-------------------+---+
   |     20200117133058|49.527694252432056|  0.5142184937933181| 
0.7340133901254792|0.0|
   |     20200117133058|  98.3428192817987|  0.3349917833248327| 
0.4777395067707303|0.0|
   |     20200117133058|  90.9053809533154| 
0.19949323322922063|0.18294079059016366|0.0|
   |     20200117133058| 90.25710109008239|  
0.4006983139989222|0.08528650347654165|0.0|
   |     20200117133058| 63.72504913279929|   0.888493603696927| 
0.6570857443423376|0.0|
   |     20200117133058| 29.47661370147079|0.010872312870502165| 
0.1593867607188556|0.0|
   
+-------------------+------------------+--------------------+-------------------+---+
   
   
   scala> val beginTime = "000" // Represents all commits > this time.
   beginTime: String = 000
   
   scala> val endTime = commits(commits.length - 2) // commit time we are 
interested in
   endTime: String = 20200117133031
   
   scala> 
   
   scala> //incrementally query data
   
   scala> val incViewDF = spark.read.format("org.apache.hudi").
        |     option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
        |     option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
        |     option(END_INSTANTTIME_OPT_KEY, endTime).
        |     load(basePath);
   incViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 13 more fields]
   
   scala> incViewDF.registerTempTable("hudi_incr_table")
   warning: there was one deprecation warning; re-run with -deprecation for 
details
   
   scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, 
ts from  hudi_incr_table where fare > 20.0").show()
   
+-------------------+------------------+-------------------+-------------------+---+
   |_hoodie_commit_time|              fare|          begin_lon|          
begin_lat| ts|
   
+-------------------+------------------+-------------------+-------------------+---+
   |     20200117133031| 64.27696295884016| 0.4923479652912024| 
0.5731835407930634|0.0|
   |     20200117133031| 
93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
   |     20200117133031| 27.79478688582596| 
0.6273212202489661|0.11488393157088261|0.0|
   |     20200117133031| 33.92216483948643| 0.9694586417848392| 
0.1856488085068272|0.0|
   |     20200117133031|  43.4923811219014| 0.8779402295427752| 
0.6100070562136587|0.0|
   |     20200117133031| 66.62084366450246|0.03844104444445928| 
0.0750588760043035|0.0|
   |     20200117133031|34.158284716382845|0.46157858450465483| 
0.4726905879569653|0.0|
   |     20200117133031| 41.06290929046368| 0.8192868687714224|  
0.651058505660742|0.0|
   
+-------------------+------------------+-------------------+-------------------+---+
   
   
   scala>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to