[
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86855&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86855
]
ASF GitHub Bot logged work on BEAM-3437:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m
Work Description: akedin commented on a change in pull request #4964:
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178629710
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
##########
@@ -17,69 +17,146 @@
*/
package org.apache.beam.sdk.coders;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema.FieldTypeDescriptor;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.RowType;
/**
* A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each
element directly.
*/
@Experimental
public class RowCoder extends CustomCoder<Row> {
+ private static final Map<TypeName, Coder> CODER_MAP =
ImmutableMap.<TypeName, Coder>builder()
+ .put(TypeName.BYTE, ByteCoder.of())
+ .put(TypeName.INT16, BigEndianShortCoder.of())
+ .put(TypeName.INT32, BigEndianIntegerCoder.of())
+ .put(TypeName.INT64, BigEndianLongCoder.of())
+ .put(TypeName.DECIMAL, BigDecimalCoder.of())
+ .put(TypeName.FLOAT, FloatCoder.of())
+ .put(TypeName.DOUBLE, DoubleCoder.of())
+ .put(TypeName.STRING, StringUtf8Coder.of())
+ .put(TypeName.DATETIME, InstantCoder.of())
+ .put(TypeName.BOOLEAN, BooleanCoder.of())
+ .build();
+
+ private static final Map<TypeName, Integer> ESTIMATED_FIELD_SIZES =
+ ImmutableMap.<TypeName, Integer>builder()
+ .put(TypeName.BYTE, Byte.BYTES)
+ .put(TypeName.INT16, Short.BYTES)
+ .put(TypeName.INT32, Integer.BYTES)
+ .put(TypeName.INT64, Long.BYTES)
+ .put(TypeName.FLOAT, Float.BYTES)
+ .put(TypeName.DOUBLE, Double.BYTES)
+ .put(TypeName.DECIMAL, 32)
+ .put(TypeName.BOOLEAN, 1)
+ .put(TypeName.DATETIME, Long.BYTES)
+ .build();
+
private static final BitSetCoder nullListCoder = BitSetCoder.of();
- private RowType rowType;
- private List<Coder> coders;
+ private Schema schema;
+
+ /**
+ * Returns the coder used for a given primitive type.
+ */
+ public static <T> Coder<T> coderForPrimitiveType(TypeName typeName) {
+ return (Coder<T>) CODER_MAP.get(typeName);
+ }
+
+ /**
+ * Return the estimated serialized size of a give row object.
+ */
+ public static long estimatedSizeBytes(Row row) {
Review comment:
Does this belong to `RowCoder`? Right now this method doesn't do anything
related to encoding/decoding and only uses `Schema` and `Row`. Would it be
better to have this interface on `Schema`? Or probably just a separate utility
class since it's not frequently used?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 86855)
Time Spent: 3h 20m (was: 3h 10m)
> Support schema in PCollections
> ------------------------------
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
> Issue Type: Wish
> Components: beam-model
> Reporter: Jean-Baptiste Onofré
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark
> runner with data frames for instance).
> A technical draft document has been created:
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=AAAABhykQIs&ts=5a203b46&usp=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion"
> PR.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)