[
https://issues.apache.org/jira/browse/BEAM-10277?focusedWorklogId=586974&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-586974
]
ASF GitHub Bot logged work on BEAM-10277:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Apr/21 00:10
Start Date: 22/Apr/21 00:10
Worklog Time Spent: 10m
Work Description: reuvenlax commented on a change in pull request #14591:
URL: https://github.com/apache/beam/pull/14591#discussion_r617974791
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
##########
@@ -336,27 +387,30 @@ public InstrumentedType prepare(InstrumentedType
instrumentedType) {
// The decode method of the generated Coder delegates to this method to
evaluate all of the
// per-field Coders.
- static Row decodeDelegate(Schema schema, Coder[] coders, InputStream
inputStream)
+ static Row decodeDelegate(
+ Schema schema, Coder[] coders, int[] encodingPosToIndex, InputStream
inputStream)
throws IOException {
int fieldCount = VAR_INT_CODER.decode(inputStream);
BitSet nullFields = NULL_LIST_CODER.decode(inputStream);
- List<Object> fieldValues = Lists.newArrayListWithCapacity(coders.length);
- for (int i = 0; i < fieldCount; ++i) {
+ Object[] fieldValues = new Object[coders.length];
+ for (int encodingPos = 0; encodingPos < fieldCount; ++encodingPos) {
+ int rowIndex = encodingPosToIndex[encodingPos];
// In the case of a schema change going backwards, fieldCount might be
> coders.length,
// in which case we drop the extra fields.
Review comment:
Added test. This is mostly so that if people emergency rollback updates
things don't start crashing on them. (ideally you would roll back to a previous
state snapshot, but in emergency situations it's nice that something works)
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java
##########
@@ -61,11 +62,15 @@ public CloudObject toCloudObject(SchemaCoder target,
SdkComponents sdkComponents
FROM_ROW_FUNCTION,
StringUtils.byteArrayToJsonString(
SerializableUtils.serializeToByteArray(target.getFromRowFunction())));
- Structs.addString(
- base,
- SCHEMA,
- StringUtils.byteArrayToJsonString(
- SchemaTranslation.schemaToProto(target.getSchema(),
true).toByteArray()));
+
+ try {
+ Structs.addString(
+ base,
+ SCHEMA,
+
JsonFormat.printer().print(SchemaTranslation.schemaToProto(target.getSchema(),
true)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
Review comment:
yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 586974)
Time Spent: 0.5h (was: 20m)
> beam:coder:row:v1 implementations should respect encoding_position
> ------------------------------------------------------------------
>
> Key: BEAM-10277
> URL: https://issues.apache.org/jira/browse/BEAM-10277
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core, sdk-py-core
> Reporter: Brian Hulette
> Priority: P3
> Labels: Clarified
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> h3. Problem/Status
> The schema proto has an [encoding_position
> field|https://github.com/apache/beam/blob/2c619c81082839e054f16efee9311b9f74b6e436/model/pipeline/src/main/proto/schema.proto#L55]
> that is currently unused in every row coder implementation. The intention of
> this field is that it indicates an alternative order for the fields to be
> encoded in by [beam:coder:row:v1
> implementations|https://github.com/apache/beam/blob/1e60f383fb39b9ff8d44edcbe5357da4c1e52378/model/pipeline/src/main/proto/beam_runner_api.proto#L937-L990].
> Currently all the implementations ignore this field, and always encode the
> fields in the order that they appear in the schema.
> h3. Motivation
> The idea with the encoding position is that it will give runners away to
> enforce schema compatibility (BEAM-9502), by re-ordering the way fields are
> encoded when the schema changes between two job submissions. Schema changes
> could be due to fields re-ordering, or field additions/deletions.
> h3. Code pointers
> The Python beam:coder:row:v1 implementation lives in
> [row_coder.py|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/row_coder.py]
> The Java implementation is a little more complicated, distributed between
> [SchemaCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java],
>
> [RowCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java],
> and
> [RowCoderGenerator|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java].
> RowCoderGenerator contains the code relevant to this jira - it uses
> bytebuddy to generate Java code for the coder. We need it to generate code
> that puts fields in the order specified by encoding_position.
> h3. Testing
> Python and Java implementations should be tested with unit tests
> ([RowCoderTest|https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java],
>
> [row_coder_test|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/row_coder_test.py]).
> We should also test them for compatibility by adding test cases that
> exercise the encoding_position in
> [standard_coders.yaml|https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml].
> These tests will be executed by
> [CommonCoderTest|https://github.com/apache/beam/blob/master/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java]
> and
> [standard_coders_test|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/standard_coders_test.py].
> There's some example code for generating a new test case
> [here|https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml#L387-L400].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)