[
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=88174&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88174
]
ASF GitHub Bot logged work on BEAM-3437:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Apr/18 18:02
Start Date: 05/Apr/18 18:02
Worklog Time Spent: 10m
Work Description: kennknowles commented on issue #4964: [BEAM-3437]
Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#issuecomment-379024918
It seems like this is a good idea that needs lots of baking. That will work
best once it is in. How about we build a document with notes on follow-ups or
an umbrella JIRA with subtasks? Otherwise I'm concerned the collection of
things we want to look into more specifically may get lost.
Being totally frank, the code seems fine while the fundamentals of what a
schema is are where I still have the most questions, especially as pertains to
portability. At the portability layer, encodings (coders) and types are
synonymous. In a particular language, there is the language's types that come
from coders. Then each SQL dialect has its own notion of standard types that
need not correspond to any general purpose language's. And of course Avro and
Proto have their own encoding-to-language mappings to contend with. I really
don't think Beam should add another.
So I want to get this in as experimental but continue work before we have
lots of dependencies on schemas. So <img class="emoji" title=":lgtm:"
alt=":lgtm:" align="absmiddle" src="https://reviewable.io/lgtm.png" height="20"
width="61"/> pending a followup document or JIRA.
---
Reviewed 79 of 150 files at r1, 4 of 20 files at r4, 11 of 29 files at r5, 1
of 11 files at r6, 20 of 32 files at r7, 1 of 1 files at r8, 32 of 33 files at
r9.
Review status: all files reviewed at latest revision, 3 unresolved
discussions.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java,
line 39 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LipYfDPrPNLA5iboh:-L9LipYfDPrPNLA5iboi:b-kcvp9c)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java#L39)):*
> ```Java
> @Experimental
> public class RowCoder extends CustomCoder<Row> {
> private static final Map<TypeName, Coder> CODER_MAP =
ImmutableMap.<TypeName, Coder>builder()
> ```
These should probably be defaults, not hardcoded.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java,
line 52 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LiaqH42LqLGYe_yRb:-L9LiaqH42LqLGYe_yRc:bz5ztdg)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java#L52)):*
> ```Java
> .build();
>
> private static final Map<TypeName, Integer> ESTIMATED_FIELD_SIZES =
> ```
Units in the name - at usage sites it will not be clear what they are.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java,
line 79 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9Lit5n4nmMBm0PsKNI:-L9Lit5n4nmMBm0PsKNJ:bft3uyf)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java#L79)):*
> ```Java
> * Return the estimated serialized size of a give row object.
> */
> public static long estimatedSizeBytes(Row row) {
> ```
And given the particular field coders being per-instance, this would be a
non-static method, etc.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java, line 296
at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LiPXj0UF0Su-q34K4:-L9LiPXj0UF0Su-q34K5:b-kxeoth)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L296)):*
> ```Java
> * PCollection<BankTransaction> transactions = ...;
> * transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
> * .by(Transaction::getTypeName)
> ```
There are a bunch of tiny renames that I don't really get.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java, line
18 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9H0EwC1Ny6SIRPGLDb:-L9H0EwC1Ny6SIRPGLDc:b-8ioiqz)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L18)):*
> ```Java
> * limitations under the License.
> */
> package org.apache.beam.sdk.schemas;
> ```
Just to keep things simple - is it useful to have a package boundary here? I
would imagine this fits in pretty well with `org.apache.beam.sdk.values` if we
have the intention of integrating it more deeply. That would then allow the
value classes and schemas to have package-private interactions, which might
come in handy.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java, line
42 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9H0im88YzHWgZ8J7NL:-L9H0im88YzHWgZ8J7NM:bkcnfro)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L42)):*
> ```Java
>
> /**
> * {@link Schema} describes the fields in {@link Row}.
> ```
Doesn't the schema describe other POJOs, too? I thought Row was just the
quintessential dynamically-generated schema-ified object.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java, line
43 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9H0c4t4yUL1jNihHF9:-L9H0c4u11DIyw_ydAd8:b6c5iww)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L43)):*
> ```Java
> /**
> * {@link Schema} describes the fields in {@link Row}.
> *
> ```
nit: autoformat
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java, line
49 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9H18Fs7StuJQgTxkGw:-L9H18Fs7StuJQgTxkGx:bokabpy)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L49)):*
> ```Java
> // A mapping between field names an indices.
> private BiMap<String, Integer> fieldIndices = HashBiMap.create();
> private List<Field> fields;
> ```
Is this for efficiency? Not pressing, but I would imagine the primary way of
interacting with a schema would be by name, and secondarily by index. (So I
would just store a name-to-index mapping if there were no perf reason to do
otherwise)
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java, line
75 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9H1QQfANlp3iIR82In:-L9H1QQfANlp3iIR82Io:bt2lltm)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L75)):*
> ```Java
> }
>
> public Builder addByteField(String name, boolean nullable) {
> ```
TBH I would ditch these builders. A big smell is the boolean, where you'd
just want two builders `addField` and `addNullableField` for readability. And
at that point it is just as easy to pass in a nullable field descriptor and
have just a single `addField`.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java, line
190 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LhAy01xzEGPIAktaw:-L9LhAy01xzEGPIAktax:b-1qg7ta)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L190)):*
> ```Java
> * An enumerated list of supported types.
> */
> public enum TypeName {
> ```
I really think this is contrary to the spirit of Beam. Isn't the idea of a
Schema that it might apply to "any" data type? I know we've talked about it. I
would expect that a schema / row bottoms out at either a Coder (if portable) or
a TypeDescriptor (if Java). I don't know that we should replicate lists of
basic types, but leave some of that up to the other tooling like SQL or
connectors.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java, line 130
at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9Lhi892EZ9mWijwBhZ:-L9Lhi892EZ9mWijwBh_:bf3wfx)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java#L130)):*
> ```Java
>
> /**
> * Get a {@link TypeName#DECIMAL} value by field name, {@link
IllegalStateException} is thrown
> ```
I don't like this change in exception much. `IllegalStateException` is more
concerned with mutable objects in the wrong state, generally not for immutable
stuff. I think class cast is actually fairly accurate - we have a dynamically
typed row and the method is an implicit cast. Acknowledge that it is a gray
area where this is sort of OK.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java,
line 49 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9Lj6Jn5TFuSHmqJGhK:-L9Lj6Jn5TFuSHmqJGhL:b-vfdmfs)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java#L49)):*
> ```Java
> *
> */
> public class DefaultRowTypeFactory implements RowTypeFactory {
> ```
Definitely file something to follow the rename through and make RowType
disappear.
---
*[sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeGetters.java,
line 43 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LjEoX87FFINCT4rg4:-L9LjEoX87FFINCT4rg5:bti6oin)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeGetters.java#L43)):*
> ```Java
> * Returns a {@link Schema}.
> */
> Schema rowType() {
> ```
rowType -> schema
---
*[sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java,
line 43 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LjMa75DH6Jgn9Rm7J:-L9LjMa75DH6Jgn9Rm7K:bhxgksq)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java#L43)):*
> ```Java
> public class RowCoderTest {
>
> void checkEncodeDecode(Row row) throws IOException {
> ```
Use `CoderProperties`
---
*[sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java, line
77 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9Ljham0IV_XW3nmZgU:-L9Ljham0IV_XW3nmZgV:b-az0xin)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java#L77)):*
> ```Java
> public void testCreatesRecord() {
> Schema schema = Schema.builder()
> .addByteField("f_byte", false)
> ```
So, like here, just drop the boolean in the default case.
---
*[sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java,
line 41 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LvyxX6paAq7fVIRm_:-L9LvyxX6paAq7fVIRma:b9k9z76)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L41)):*
> ```Java
> * Built-in aggregations functions for
COUNT/MAX/MIN/SUM/AVG/VAR_POP/VAR_SAMP.
> *
> * <p>TODO: Consider making the interface in terms of (1-column) rows.
reuvenlax
> ```
I think for Beam the best thing to do is TODO(url to JIRA)
---
*[sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java,
line 46 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LvtmP7Yumz0Nk2lgK:-L9LvtmP7Yumz0Nk2lgL:bmd3gah)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L46)):*
> ```Java
> private static MathContext mc = new MathContext(10,
RoundingMode.HALF_UP);
> ```
Random whitespace? Maybe just autoformat the whole SQL directory anyhow.
---
*[sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java,
line 64 at
r9](https://beta.reviewable.io/reviews/apache/beam/4964#-L9LwKJF8YcDSyw0BMwo:-L9LwKJF8YcDSyw0BMwp:br80y21)
([raw
file](https://github.com/apache/beam/blob/d28693b35568d8ebee30301329c77b2cc2feaf26/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java#L64)):*
> ```Java
> .put(TypeName.DATETIME.type().withMetadata("DATE"),
SqlTypeName.DATE)
> .put(TypeName.DATETIME.type().withMetadata("TIME"),
SqlTypeName.TIME)
> .put(TypeName.DATETIME.type().withMetadata("TIME_WITH_LOCAL_TZ"),
> ```
I keep coming back to the fact that this really looks like it should be a
`SqlTypeCoder` that contains (a) a delegate coder for doing the encoding and
(b) metadata that says what the SQL type should be. I don't think Beam actually
needs the concepts of types.
---
*Comments from
[Reviewable](https://beta.reviewable.io/reviews/apache/beam/4964#-:-L9LgLO6Doop1Bw8wgkK:bz7v0nu)*
<!-- Sent from Reviewable.io -->
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 88174)
Time Spent: 7h 50m (was: 7h 40m)
> Support schema in PCollections
> ------------------------------
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
> Issue Type: Wish
> Components: beam-model
> Reporter: Jean-Baptiste Onofré
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Time Spent: 7h 50m
> Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark
> runner with data frames for instance).
> A technical draft document has been created:
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=AAAABhykQIs&ts=5a203b46&usp=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion"
> PR.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)