+1 for Scala stream like api.

I would like to help developing Scala like API.  I was doing prototype for
similar api in scala to generate linear DAG.  I was facing problem
designing stream like
api to generate arbitrary DAG, that our platform supports. and support all
functionality of the the platform, like specifying attributes, adding
custom operators to DAG.

The repository is :
https://github.com/tushargosavi/apexstream

A simple wordcount application is, With lambda function support, it is very
easy to write simple applications.

```java
@ApplicationAnnotation(name="TestApp")
class TestApp extends StreamingApplication {

  override def populateDAG(dag: DAG, conf: Configuration): Unit = {

    val ctx : DTContext = new DTContext(dag, conf)

    ctx.fileInput("/user/tushar/data") // this wraps FileInputOperator ,
      .flatMap(_.split(" ")) // split the word in split and emit individual
words
      .filter(_.length > 0) // filter out empty words
      .addOperator[java.util.Map[String, Int]](new UniqueCounter()) // use
malhar operator
      .flatMap(_.asScala.map(_.toString())) // convert map to key value
pairs
      .print()

    ctx.build
  }
}
```

The fileInput function is nothing but
```
  def fileInput(dir : String) : Stream[String] = {
    val op = new FileLineInputOperator()
    op.setDirectory(dir)
    source[String](op)
  }
```


The repository has basic support for map, flatMap, filter, reduce, count
support which works on application window boundaries.  We will also
need shell where user can quickly type application and submit it execution
from shell directly while testing.

Another approach is to adopt Fink DataStream API or Spark Stream API. But I
feel having similar but our own API which can allow specifying all
attributes, like
partitioner, streamcodec, other attributes will be a big +.


- Tushar.



On Thu, Dec 24, 2015 at 11:28 AM, Sandeep Deshmukh <[email protected]>
wrote:

> +1 on not coming up with our own APIs. We should adapt to existing ones  so
> that there is additional learning curve for Apex users.
>
> It is more of a functional way of specifying the DAG and a subset of scala
> could be a good starting point.
>
> Regards,
> Sandeep
>
> On Thu, Dec 24, 2015 at 5:21 AM, Siyuan Hua <[email protected]>
> wrote:
>
> > My first suggestion is we should focus on Stream API(or change the name
> we
> > call it) for now.  High-level API is confusion and could be anything that
> > helps.
> >
> > Stream is in fact more well-known concept other than Operators, ports,
> > connector, etc. I think the idea originate from scala sequence API(
> > http://www.scala-lang.org/api/current/#scala.collection.Seq)
> > And the term "Stream" already implies some minimal function we need ex.
> > "map"(t1->f->t1'), "reduce" (t1, t2,...  -> f -> t1'), "filter" (t1
> ->f(if
> > true) -> t1)
> > We shouldn't come up with arbitrary things so the API would become
> > cumbersome and hard to learn.
> >
> >
> >
> >
> >
> >
> > On Wed, Dec 23, 2015 at 3:12 PM, Siyuan Hua <[email protected]>
> > wrote:
> >
> > > Another API that could be a reference is
> > > http://storm.apache.org/documentation/Trident-API-Overview.html
> > >
> > > On Wed, Dec 23, 2015 at 3:09 PM, Ashwin Chandra Putta <
> > > [email protected]> wrote:
> > >
> > >> // Made few edits, ignore previous mail. Read this instead.
> > >>
> > >> David,
> > >>
> > >> I can imagine that it boils down to something like these function
> calls.
> > >>
> > >> DtString lines = readLines(new LineReader());
> > >> DtString words = lines.split(new LineSplitter());
> > >> DtNumber count  = words.count(new Counter());
> > >> count.print(new ConoleOutputOperator());
> > >>
> > >> Or
> > >>
> > >> readLines(new LineReader())
> > >>   .split(new LineSplitter())
> > >>   .count(new Counter())
> > >>   .print(new ConoleOutputOperator());
> > >>
> > >>
> > >> which translates to
> > >>
> > >> Reader reader = dag.addOperator("ReadLines", new LineReader());
> > >> Splitter splitter = dag.addOperator("Split", new LineSplitter());
> > >> Counter counter = dag.addOperator("Count", new WordCounter());
> > >> ConsoleOutputOperator console = dag.addOperator("Print", new
> > >> ConsoleOutputOperator());
> > >>
> > >> dag.addStream("lines", reader.output, splitter.input);
> > >> dag.addStream("words", splitter.output, counter.input);
> > >> dag.addStream("count", counter.output, console.input);
> > >>
> > >> Here are my initial thoughts:
> > >>
> > >> For the higher level api to work, we need the following support at
> > least.
> > >>
> > >> 1. The operators used in the higher level api should have concrete
> > >> implementations with all available input and output ports defined at
> the
> > >> abstract level. Have to think more about how multiple output ports
> will
> > >> play out.
> > >> 2. We need to define the objects that have method calls available on
> > them
> > >> that take operators as parameters.
> > >>
> > >> Eg: DtString can have method split and takes Splitter operator. And
> > >> Splitter operator should be abstract with input port type DtString and
> > >> output port type DtString. LineSplitter will be a concrete
> > implementation
> > >> of this operator.
> > >>
> > >> Regards,
> > >> Ashwin.
> > >>
> > >> On Wed, Dec 23, 2015 at 3:07 PM, Ashwin Chandra Putta <
> > >> [email protected]> wrote:
> > >>
> > >> > David,
> > >> >
> > >> > I can imagine that it boils down to something like these function
> > calls.
> > >> >
> > >> > DtString lines = readLines(new LineReader());
> > >> > DtString words = lines.split(new LineSplitter());
> > >> > DtNumber count  = words.count(new Counter());
> > >> > count.print(new ConoleOutputOperator());
> > >> >
> > >> > Or
> > >> >
> > >> > readLines(new LineReader())
> > >> >   .split(new LineSplitter())
> > >> >   .count(new Counter())
> > >> >   .print(new ConoleOutputOperator());
> > >> >
> > >> >
> > >> > which translates to
> > >> >
> > >> > Reader reader = dag.addOperator("ReadLines", new LineReader());
> > >> > LineSplitter splitter = dag.addOperator("Split", new
> LineSplitter());
> > >> > WordCounter counter = dag.addOperator("Count", new Counter());
> > >> > ConsoleOutputOperator console = dag.addOperator("Print", new
> > >> > ConsoleOutputOperator());
> > >> >
> > >> > dag.addStream("lines", reader.output, splitter.input);
> > >> > dag.addStream("words", splitter.output, counter.input);
> > >> > dag.addStream("count", counter.output, console.input);
> > >> >
> > >> > Here are my initial thoughts:
> > >> >
> > >> > For the higher level api to work, we need the following support at
> > >> least.
> > >> >
> > >> > 1. The operators used in the higher level api should have concrete
> > >> > implementations with all available input and output ports defined at
> > the
> > >> > abstract level. Have to think more about how multiple output ports
> > will
> > >> > play out.
> > >> > 2. We need to define the objects that have method calls available on
> > >> them
> > >> > that take operators as parameters.
> > >> >
> > >> > Eg: DtString can have method split and takes Splitter operator. And
> > >> > Splitter operator should be abstract with input port type DtString
> and
> > >> > output port type DtString. LineSplitter will be a concrete
> > >> implementation
> > >> > of this operator.
> > >> >
> > >> > Regards,
> > >> > Ashwin.
> > >> >
> > >> > On Wed, Dec 23, 2015 at 1:42 PM, David Yan <[email protected]>
> > >> wrote:
> > >> >
> > >> >> Hi fellow Apex developers:
> > >> >>
> > >> >> Apex has a comprehensive API for constructing DAG topologies for
> > >> streaming
> > >> >> applications, using operators, ports and streams.  But this may
> seem
> > >> too
> > >> >> much for folks who just want to build simple applications, or just
> to
> > >> >> learn
> > >> >> about Apex.  For example, when you compare the code to do word
> count
> > in
> > >> >> Apex with Spark Streaming or Flink, Apex requires much more code.
> > >> >>
> > >> >> Apex:
> > >> >>
> > >> >>
> > >>
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/demos/wordcount/src/main
> > >> >>
> > >> >> Spark Streaming:
> > >> >>
> > >> >>
> > >>
> >
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
> > >> >>
> > >> >> Flink:
> > >> >>
> > >> >>
> > >>
> >
> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
> > >> >>
> > >> >> Note that their Scala versions are even simpler to use.
> > >> >>
> > >> >> The high-level requirements I have in mind is as follow:
> > >> >>
> > >> >> 1. A simple-to-use high-level API similar to what Spark Streaming
> and
> > >> >> Flink
> > >> >> have. And from the high-level API, the Apex engine will construct
> the
> > >> >> actual DAG topology at launch time.
> > >> >>
> > >> >> 2. The first language we will support is Java, but we will also
> want
> > to
> > >> >> support Scala and possibly Python at some point, so the high-level
> > API
> > >> >> should make it easy for implementing bindings for at least these
> two
> > >> >> languages.
> > >> >>
> > >> >> 3. We should be able to use the high-level API in Apex App Package
> > >> (apa)
> > >> >> file, so that dtcli can launch it just like a regular apa today.
> > >> >>
> > >> >> Please provide your ideas and thoughts on this topic.
> > >> >>
> > >> >> Thanks,
> > >> >>
> > >> >> David
> > >> >>
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> >
> > >> > Regards,
> > >> > Ashwin.
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >>
> > >> Regards,
> > >> Ashwin.
> > >>
> > >
> > >
> >
>

Reply via email to