[
https://issues.apache.org/jira/browse/BEAM-3437?focusedWorklogId=86863&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86863
]
ASF GitHub Bot logged work on BEAM-3437:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Apr/18 00:04
Start Date: 03/Apr/18 00:04
Worklog Time Spent: 10m
Work Description: akedin commented on a change in pull request #4964:
[BEAM-3437] Introduce Schema class, and use it in BeamSQL
URL: https://github.com/apache/beam/pull/4964#discussion_r178676886
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
##########
@@ -17,69 +17,146 @@
*/
package org.apache.beam.sdk.coders;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldTypeDescriptor;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.RowType;
/**
* A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each
element directly.
*/
@Experimental
public class RowCoder extends CustomCoder<Row> {
+ private static final Map<TypeName, Coder> CODER_MAP =
ImmutableMap.<TypeName, Coder>builder()
+ .put(TypeName.BYTE, ByteCoder.of())
+ .put(TypeName.INT16, BigEndianShortCoder.of())
+ .put(TypeName.INT32, BigEndianIntegerCoder.of())
+ .put(TypeName.INT64, BigEndianLongCoder.of())
+ .put(TypeName.DECIMAL, BigDecimalCoder.of())
+ .put(TypeName.FLOAT, FloatCoder.of())
+ .put(TypeName.DOUBLE, DoubleCoder.of())
+ .put(TypeName.STRING, StringUtf8Coder.of())
+ .put(TypeName.DATETIME, InstantCoder.of())
+ .put(TypeName.BOOLEAN, BooleanCoder.of())
+ .build();
+
+ private static final Map<TypeName, Integer> ESTIMATED_FIELD_SIZES =
+ ImmutableMap.<TypeName, Integer>builder()
+ .put(TypeName.BYTE, Byte.BYTES)
+ .put(TypeName.INT16, Short.BYTES)
+ .put(TypeName.INT32, Integer.BYTES)
+ .put(TypeName.INT64, Long.BYTES)
+ .put(TypeName.FLOAT, Float.BYTES)
+ .put(TypeName.DOUBLE, Double.BYTES)
+ .put(TypeName.DECIMAL, 32)
+ .put(TypeName.BOOLEAN, 1)
+ .put(TypeName.DATETIME, Long.BYTES)
+ .build();
+
private static final BitSetCoder nullListCoder = BitSetCoder.of();
- private RowType rowType;
- private List<Coder> coders;
+ private Schema schema;
+
+ /**
+ * Returns the coder used for a given primitive type.
+ */
+ public static <T> Coder<T> coderForPrimitiveType(TypeName typeName) {
+ return (Coder<T>) CODER_MAP.get(typeName);
+ }
+
+ /**
+ * Return the estimated serialized size of a give row object.
+ */
+ public static long estimatedSizeBytes(Row row) {
+ Schema schema = row.getSchema();
+ int fieldCount = schema.getFieldCount();
+ int bitmapSize = (((fieldCount - 1) >> 6) + 1) * 8;
- private RowCoder(RowType rowType, List<Coder> coders) {
- this.rowType = rowType;
- this.coders = coders;
+ int fieldsSize = 0;
+ for (int i = 0; i < schema.getFieldCount(); ++i) {
+ fieldsSize += estimatedSizeBytes(schema.getField(i).getTypeDescriptor(),
row.getValue(i));
+ }
+ return bitmapSize + fieldsSize;
}
- public static RowCoder of(RowType rowType, List<Coder> coderArray) {
- if (rowType.getFieldCount() != coderArray.size()) {
- throw new IllegalArgumentException("Coder size doesn't match with field
size");
+ private static long estimatedSizeBytes(FieldTypeDescriptor typeDescriptor,
Object value) {
+ switch (typeDescriptor.getType()) {
+ case ROW:
+ return estimatedSizeBytes((Row) value);
+ case ARRAY:
+ List list = (List) value;
+ long listSizeBytes = 0;
+ for (Object elem : list) {
+ listSizeBytes +=
estimatedSizeBytes(typeDescriptor.getComponentType(), elem);
+ }
+ return 4 + listSizeBytes;
+ case STRING:
+ // Not always accurate - String.getBytes().length() would be more
accurate here, but slower.
+ return ((String) value).length();
+ default:
+ return ESTIMATED_FIELD_SIZES.get(typeDescriptor.getType());
}
- return new RowCoder(rowType, coderArray);
}
- public RowType getRowType() {
- return rowType;
+ private RowCoder(Schema schema) {
+ this.schema = schema;
+ }
+
+ public static RowCoder of(Schema schema) {
+ return new RowCoder(schema);
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ Coder getCoder(FieldTypeDescriptor fieldTypeDescriptor) {
+ if (TypeName.ARRAY.equals(fieldTypeDescriptor.getType())) {
+ return ListCoder.of(getCoder(fieldTypeDescriptor.getComponentType()));
Review comment:
we probably should still keep the `Map<FieldTypeDescriptor, Coder>` to cache
everything
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 86863)
Time Spent: 4.5h (was: 4h 20m)
> Support schema in PCollections
> ------------------------------
>
> Key: BEAM-3437
> URL: https://issues.apache.org/jira/browse/BEAM-3437
> Project: Beam
> Issue Type: Wish
> Components: beam-model
> Reporter: Jean-Baptiste Onofré
> Assignee: Jean-Baptiste Onofré
> Priority: Major
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> As discussed with some people in the team, it would be great to add schema
> support in {{PCollections}}. It will allow us:
> 1. To expect some data type in {{PTransforms}}
> 2. Improve some runners with additional features (I'm thinking about Spark
> runner with data frames for instance).
> A technical draft document has been created:
> https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit?disco=AAAABhykQIs&ts=5a203b46&usp=comment_email_document
> I also started a PoC on a branch, I will update this Jira with a "discussion"
> PR.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)