[ 
https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-3573:
-------------------------------------
    Description: 
This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra 
ML-specific metadata embedded in its schema.

.Sample code

Suppose we have training events stored on HDFS and user/ad features in Hive, we 
want to assemble features for training and then apply decision tree.
The proposed pipeline with dataset looks like the following (need more 
refinements):

{code}
sqlContext.jsonFile("/path/to/training/events", 0.01).registerTempTable("event")
val training = sqlContext.sql("""
  SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, 
event.action AS label,
         user.gender AS userGender, user.country AS userCountry, user.features 
AS userFeatures,
         ad.targetGender AS targetGender
    FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = 
ad.id;""").cache()

val indexer = new Indexer()
val interactor = new Interactor()
val fvAssembler = new FeatureVectorAssembler()
val treeClassifer = new DecisionTreeClassifer()

val paramMap = new ParamMap()
  .put(indexer.features, Map("userCountryIndex" -> "userCountry"))
  .put(indexer.sortByFrequency, true)
  .put(interactor.features, Map("genderMatch" -> Array("userGender", 
"targetGender")))
  .put(fvAssembler.features, Map("features" -> Array("genderMatch", 
"userCountryIndex", "userFeatures")))
  .put(fvAssembler.dense, true)
  .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes 
"features" and "label" columns.

val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier)
val model = pipeline.fit(raw, paramMap)

sqlContext.jsonFile("/path/to/events", 0.01).registerTempTable("event")
val test = sqlContext.sql("""
  SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId,
         user.gender AS userGender, user.country AS userCountry, user.features 
AS userFeatures,
         ad.targetGender AS targetGender
    FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = 
ad.id;""")

val prediction = model.transform(test).select('eventId, 'prediction)
{code}

  was:
This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra 
ML-specific metadata embedded in its schema.

.Sample code

Suppose we have training events stored on HDFS and user/ad features in Hive, we 
want to assemble features for training and then apply decision tree.
The proposed pipeline with dataset looks like the following (need more 
refinements):

{code}
sqlContext.jsonFile("/path/to/training/events", 0.01).registerTempTable("event")
val training = sqlContext.sql("""
  SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, 
event.action AS label,
         user.gender AS userGender, user.country AS userCountry, user.features 
AS userFeatures,
         ad.targetGender AS targetGender
    FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = 
ad.id;""").cache()

val indexer = new Indexer()
val interactor = new Interactor()
val fvAssembler = new FeatureVectorAssembler()
val treeClassifer = new DecisionTreeClassifer()

val paramMap = new ParamMap()
  .put(indexer.features, Map("userCountryIndex" -> "userCountry"))
  .put(indexer.sortByFrequency, true)
  .put(iteractor.features, Map("genderMatch" -> Array("userGender", 
"targetGender")))
  .put(fvAssembler.features, Map("features" -> Array("genderMatch", 
"userCountryIndex", "userFeatures")))
  .put(fvAssembler.dense, true)
  .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes 
"features" and "label" columns.

val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier)
val model = pipeline.fit(raw, paramMap)

sqlContext.jsonFile("/path/to/events", 0.01).registerTempTable("event")
val test = sqlContext.sql("""
  SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId,
         user.gender AS userGender, user.country AS userCountry, user.features 
AS userFeatures,
         ad.targetGender AS targetGender
    FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = 
ad.id;""")

val prediction = model.transform(test).select('eventId, 'prediction)
{code}


> Dataset
> -------
>
>                 Key: SPARK-3573
>                 URL: https://issues.apache.org/jira/browse/SPARK-3573
>             Project: Spark
>          Issue Type: Sub-task
>          Components: MLlib
>            Reporter: Xiangrui Meng
>            Assignee: Xiangrui Meng
>            Priority: Critical
>
> This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra 
> ML-specific metadata embedded in its schema.
> .Sample code
> Suppose we have training events stored on HDFS and user/ad features in Hive, 
> we want to assemble features for training and then apply decision tree.
> The proposed pipeline with dataset looks like the following (need more 
> refinements):
> {code}
> sqlContext.jsonFile("/path/to/training/events", 
> 0.01).registerTempTable("event")
> val training = sqlContext.sql("""
>   SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, 
> event.action AS label,
>          user.gender AS userGender, user.country AS userCountry, 
> user.features AS userFeatures,
>          ad.targetGender AS targetGender
>     FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = 
> ad.id;""").cache()
> val indexer = new Indexer()
> val interactor = new Interactor()
> val fvAssembler = new FeatureVectorAssembler()
> val treeClassifer = new DecisionTreeClassifer()
> val paramMap = new ParamMap()
>   .put(indexer.features, Map("userCountryIndex" -> "userCountry"))
>   .put(indexer.sortByFrequency, true)
>   .put(interactor.features, Map("genderMatch" -> Array("userGender", 
> "targetGender")))
>   .put(fvAssembler.features, Map("features" -> Array("genderMatch", 
> "userCountryIndex", "userFeatures")))
>   .put(fvAssembler.dense, true)
>   .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes 
> "features" and "label" columns.
> val pipeline = Pipeline.create(indexer, interactor, fvAssembler, 
> treeClassifier)
> val model = pipeline.fit(raw, paramMap)
> sqlContext.jsonFile("/path/to/events", 0.01).registerTempTable("event")
> val test = sqlContext.sql("""
>   SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId,
>          user.gender AS userGender, user.country AS userCountry, 
> user.features AS userFeatures,
>          ad.targetGender AS targetGender
>     FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = 
> ad.id;""")
> val prediction = model.transform(test).select('eventId, 'prediction)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to