reuvenlax commented on a change in pull request #14591:
URL: https://github.com/apache/beam/pull/14591#discussion_r617974791
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
##########
@@ -336,27 +387,30 @@ public InstrumentedType prepare(InstrumentedType
instrumentedType) {
// The decode method of the generated Coder delegates to this method to
evaluate all of the
// per-field Coders.
- static Row decodeDelegate(Schema schema, Coder[] coders, InputStream
inputStream)
+ static Row decodeDelegate(
+ Schema schema, Coder[] coders, int[] encodingPosToIndex, InputStream
inputStream)
throws IOException {
int fieldCount = VAR_INT_CODER.decode(inputStream);
BitSet nullFields = NULL_LIST_CODER.decode(inputStream);
- List<Object> fieldValues = Lists.newArrayListWithCapacity(coders.length);
- for (int i = 0; i < fieldCount; ++i) {
+ Object[] fieldValues = new Object[coders.length];
+ for (int encodingPos = 0; encodingPos < fieldCount; ++encodingPos) {
+ int rowIndex = encodingPosToIndex[encodingPos];
// In the case of a schema change going backwards, fieldCount might be
> coders.length,
// in which case we drop the extra fields.
Review comment:
Added test. This is mostly so that if people emergency rollback updates
things don't start crashing on them. (ideally you would roll back to a previous
state snapshot, but in emergency situations it's nice that something works)
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java
##########
@@ -61,11 +62,15 @@ public CloudObject toCloudObject(SchemaCoder target,
SdkComponents sdkComponents
FROM_ROW_FUNCTION,
StringUtils.byteArrayToJsonString(
SerializableUtils.serializeToByteArray(target.getFromRowFunction())));
- Structs.addString(
- base,
- SCHEMA,
- StringUtils.byteArrayToJsonString(
- SchemaTranslation.schemaToProto(target.getSchema(),
true).toByteArray()));
+
+ try {
+ Structs.addString(
+ base,
+ SCHEMA,
+
JsonFormat.printer().print(SchemaTranslation.schemaToProto(target.getSchema(),
true)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
Review comment:
yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]