Hi Chris,

I am wondering what you mean when you say "we have no way of knowing [...] what 
the schema of the topic is (without looking at records)". How would you know 
that even if you did look at the records? I'm pretty sure that's impossible 
with avro. With the Protobuf and (at least some of) the Thirft formats, I 
believe there is some minimal metadata baked in, like field number and type, 
but still no field name. How would a SQL query work without metadata? Isn't it 
mandatory to have some sort of metadata repository for Samza SQL to work?

Of course, I imagine you could impose on the users the burden of invoking a 
(valid) DDL statement in their session before doing their queries, but isn't 
that pretty inconvenient and brittle? It would preclude users form discovering 
what's available via SHOW TABLES, DESCRIBE TABLE and the like which, in my 
experience, is half of the power SQL has to offer right there. Furthermore, 
requiring same-session DDL would only work for homogeneous topics, so no schema 
evolution.

It seems like it would be trivial to also store key/partitioning information if 
you did have a way to define schemas, and in that regard, options abound: the 
Hive 
metastore<https://cwiki.apache.org/confluence/display/Hive/Design#Design-Metastore>,
 HCat<https://cwiki.apache.org/confluence/display/Hive/HCatalog>, the 
schema-repo<https://github.com/schema-repo/schema-repo> spawned from AVRO-1124 
or whatever new thing will come out next... Alternatively, Julian seemed to 
hint that Calcite also had some metadata facilities. Of course there's also the 
possibility of a Samza home-grown solution or a Samza shim layer for any/all of 
the former options, though I think that extra work is what you wanted to avoid 
(which I think makes sense)... Given the solutions that already exist though, 
would it really be that much effort to leverage one of them?

--

Felix GV
Data Infrastructure Engineer
Distributed Data Systems
LinkedIn

f...@linkedin.com
linkedin.com/in/felixgv

________________________________________
From: Chris Riccomini [criccom...@apache.org]
Sent: Friday, January 30, 2015 10:43 AM
To: dev@samza.apache.org
Subject: Re: Streaming SQL - object models, ASTs and algebras

Hey all,

Just catching up on this thread. The Calcite + Samza approach seems pretty
compelling to me. I think most of what Julian is arguing for makes sense.
My main concern is with practicalities.

One specific case of this is the discussion about the partitioning model.
In an ideal world, I agree, developers wouldn't need to define partitions
in SQL. Practically speaking, though, Samza (Kafka, really) currently
doesn't have a metadata repository. Without a metadata repository, we have
no way of knowing 1) which key a topic is partitioned by, and 2) what the
schema of the topic is (without looking at records). We know this *within*
a query (or session), but not *between* queries (or sessions) from disjoint
users.

One could argue that we should spend time defining such a metadata
repository, which is a reasonable argument. But that's also a fairly large
effort. I wonder if we might be able to cut some corners in a clean-ish
forwards-compatible way, so that developers don't have to wait for us to
fully implement (or integrate with) something like a Hive metadata store.

In person, one thing you mentioned, Julian, was using hints, rather than
stuff baked into the syntax. If that's stomach-able, we could support
partitioning through hints, until we have a full blown metadata store.

Thoughts?

Cheers,
Chris

On Thu, Jan 29, 2015 at 5:10 PM, Julian Hyde <jul...@hydromatic.net> wrote:

>
> > On Jan 29, 2015, at 4:38 PM, Yi Pan <nickpa...@gmail.com> wrote:
> >
> > I am wondering if I can get an average that's per 30 min window averages?
> > I.e. the following is the input events in a stream:
> > {10:01, ORCL, 10, 10}
> > {10:02, MSFT, 30, 30}
> > {10:03, ORCL, 100, 110}
> > {10:17, MSFT, 45, 75}
> > {10:59, ORCL, 20, 130}
> > {11:02, ORCL, 50, 50}
> > Can I get the non-overlapping window average from 10:00-10:29, and
> > 10:30-10:59, ... ? Could you give an example how to define that window
> > operation in your model? Note that in this use case, although I may have
> 1
> > trading record per minute from the stream, I only generate 2 average
> > records from 10:00-11:00.
>
> That takes a bit of messy date arithmetic, made even more messy by the
> fact that you can't regard a SQL timestamp as a "milliseconds" value as one
> would in C or Java, nor can you divide it. Hence the trick of subtracting a
> constant timestamp, which yields an interval value, and then doing integer
> division on that interval to find the number of 30-minute periods since the
> epoch.
>
> select stream rowtime, ticker, amount,
> sum(amount) over (
> order by rowtime
> partition by ticker,
> (rowtime - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 30)
> from StockTrades;
>
> If I were doing this kind of calculation often I'd define a UDF, or even
> that user-defined window SPI I mentioned earlier.
>
> > {quote}
> > CREATE TABLE Emp(empno INTEGER, name VARCHAR(20), department VARCHAR(20))
> > PARTITION BY HASHCODE (department);
> > {quote}
> > That's good! At least it resolves my question on: "which field is the
> > partition key". However, I still have the question on the number of
> > partitions. As I stated that when the system currently does not have a
> > "auto-scaling" feature, the number of partitions for a stream has to be
> > explicitly specified. Where do you suggest to put this information in w/o
> > breaking SQL syntax?
>
> I imagine you could start the system on Tuesday with 10 partitions per
> stream and restart it on Wednesday with 8 or 12? You wouldn't want to
> change the SQL, because that's in the application. But you could change the
> definition of the stream, either the DDL or by changing some other system
> configuration. Then partitioning function (applied by the system to route
> the record) could, say, take the value of p modulo of the current number of
> streams.
>
> Julian
>
>

Reply via email to