I'm using *1.3.0.M2 & Scalding 0.9.0rc4*

I'm overriding the config object and set the es.mapping.id to "number"

import com.twitter.scalding.{Job, Args}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

class JobBase(args: Args) extends Job(args) {
  // Overide JobConfig
  override def config : Map[AnyRef,AnyRef] = {
    super.config ++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number") ++ 
Map(ConfigurationOptions.ES_WRITE_OPERATION -> "index")
  }  
}

My Scalding Job looks like that 

class ElasticSearchUpdateIndexes(args: Args) extends JobBase(args) {

  // Some data to push into elastic-search
  val someData = List(
    ("1","product1", "description1"),
    ("2","product2", "description2"),
    ("3","product3", "description3"))

  val indexNewDataInElasticSearch =
    IterableSource[(String,String, String)](someData, ('number, 'product, 
'description))
      .write(ElasticSearchSource("localhost", 9200,"index_es/type_es"))
}

And the wrapper that i'm trying to implement and contribute to Scalding for 
the ElasticSearchSource currently looks like this:

case class ElasticSearchSource(
   es_host :String="localhost",
   es_port :Int   = 9200,
   es_resource:String="scalding_index/type",
   es_fields : Fields = Fields.ALL)
  extends Source {

  def createLocalTap: Tap[_, _, _] =
    new EsTap(es_host, es_port, es_resource,"",es_fields)

  override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): 
Tap[_, _, _] = {
    mode match {
      case Local(_) => {
        createLocalTap
      }
    }
  }
}

My problem is that once i introduce :

++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number")

I'm getting 

cascading.tuple.TupleException: unable to sink into output identifier: 
'unknown'

My second concern is around the usage of the 
ConfigurationOptions.ES_MAPPING_ID as part of the JobConfiguration
I understand the benefits of that approach , for Hive/Pig, but I think for 
Cascading/Scalding - having a single property is inefficient

What i would ideally be able to do in Scalding is the following:

val productSales = data
   .filterProductsBoughtWithOffer("summer-offer-14")
   .project('productID, 'customerID, 'quantity)
   .write(ElasticSearchSource("localhost", 9200,"offers/summer-offer-14"), 
'productID)  // productID is the em.mapping.id
   .joinWithSmaller('customerID -> 'customerID, customerData)
   .write(ElasticSearchSource("localhost", 9200,"customers/got-offer"), 
'customerID)    // customerID is the em.mapping.id

So i would like within a Single Job to have multiple elastic-search sources 
& sinks.
My understanding at the moment is that elasticsearch-hadoop will not allow 
me to configure all sources..

Anyhow i'm just looking for some help in implementing this capability in 
Scalding ..
Any help appreciated

Antonios

-- 
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/2e78f7d6-7142-4225-8afa-69f29e509048%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to