[
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=87163&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87163
]
ASF GitHub Bot logged work on BEAM-3437:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Apr/18 17:08
Start Date: 03/Apr/18 17:08
Worklog Time Spent: 10m
Work Description: reuvenlax commented on a change in pull request #4964:
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178896184
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
##########
@@ -17,69 +17,146 @@
*/
package org.apache.beam.sdk.coders;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema.FieldTypeDescriptor;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.RowType;
/**
* A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each
element directly.
*/
@Experimental
public class RowCoder extends CustomCoder<Row> {
+ private static final Map<TypeName, Coder> CODER_MAP =
ImmutableMap.<TypeName, Coder>builder()
+ .put(TypeName.BYTE, ByteCoder.of())
+ .put(TypeName.INT16, BigEndianShortCoder.of())
+ .put(TypeName.INT32, BigEndianIntegerCoder.of())
+ .put(TypeName.INT64, BigEndianLongCoder.of())
+ .put(TypeName.DECIMAL, BigDecimalCoder.of())
+ .put(TypeName.FLOAT, FloatCoder.of())
+ .put(TypeName.DOUBLE, DoubleCoder.of())
+ .put(TypeName.STRING, StringUtf8Coder.of())
+ .put(TypeName.DATETIME, InstantCoder.of())
+ .put(TypeName.BOOLEAN, BooleanCoder.of())
+ .build();
+
+ private static final Map<TypeName, Integer> ESTIMATED_FIELD_SIZES =
+ ImmutableMap.<TypeName, Integer>builder()
+ .put(TypeName.BYTE, Byte.BYTES)
+ .put(TypeName.INT16, Short.BYTES)
+ .put(TypeName.INT32, Integer.BYTES)
+ .put(TypeName.INT64, Long.BYTES)
+ .put(TypeName.FLOAT, Float.BYTES)
+ .put(TypeName.DOUBLE, Double.BYTES)
+ .put(TypeName.DECIMAL, 32)
+ .put(TypeName.BOOLEAN, 1)
+ .put(TypeName.DATETIME, Long.BYTES)
+ .build();
+
private static final BitSetCoder nullListCoder = BitSetCoder.of();
- private RowType rowType;
- private List<Coder> coders;
+ private Schema schema;
+
+ /**
+ * Returns the coder used for a given primitive type.
+ */
+ public static <T> Coder<T> coderForPrimitiveType(TypeName typeName) {
+ return (Coder<T>) CODER_MAP.get(typeName);
+ }
+
+ /**
+ * Return the estimated serialized size of a give row object.
+ */
+ public static long estimatedSizeBytes(Row row) {
Review comment:
I think it is associated on RowCoder, because it is specific to how a Row is
encoded by this coder. Logically the Row object knows nothing about its coder.
In fact we might have multiple Coders for Row, and each one might have a
different estimated size.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 87163)
Time Spent: 5h 10m (was: 5h)
> Support schema in PCollections
> ------------------------------
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
> Issue Type: Wish
> Components: beam-model
> Reporter: Jean-Baptiste Onofré
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Time Spent: 5h 10m
> Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark
> runner with data frames for instance).
> A technical draft document has been created:
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=AAAABhykQIs&ts=5a203b46&usp=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion"
> PR.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)