reuvenlax commented on a change in pull request #14591:
URL: https://github.com/apache/beam/pull/14591#discussion_r617974791



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
##########
@@ -336,27 +387,30 @@ public InstrumentedType prepare(InstrumentedType 
instrumentedType) {
 
     // The decode method of the generated Coder delegates to this method to 
evaluate all of the
     // per-field Coders.
-    static Row decodeDelegate(Schema schema, Coder[] coders, InputStream 
inputStream)
+    static Row decodeDelegate(
+        Schema schema, Coder[] coders, int[] encodingPosToIndex, InputStream 
inputStream)
         throws IOException {
       int fieldCount = VAR_INT_CODER.decode(inputStream);
 
       BitSet nullFields = NULL_LIST_CODER.decode(inputStream);
-      List<Object> fieldValues = Lists.newArrayListWithCapacity(coders.length);
-      for (int i = 0; i < fieldCount; ++i) {
+      Object[] fieldValues = new Object[coders.length];
+      for (int encodingPos = 0; encodingPos < fieldCount; ++encodingPos) {
+        int rowIndex = encodingPosToIndex[encodingPos];
         // In the case of a schema change going backwards, fieldCount might be 
> coders.length,
         // in which case we drop the extra fields.

Review comment:
       Added test. This is mostly so that if people emergency rollback updates 
things don't start crashing on them. (ideally you would roll back to a previous 
state snapshot, but in emergency situations it's nice that something works)

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java
##########
@@ -61,11 +62,15 @@ public CloudObject toCloudObject(SchemaCoder target, 
SdkComponents sdkComponents
         FROM_ROW_FUNCTION,
         StringUtils.byteArrayToJsonString(
             
SerializableUtils.serializeToByteArray(target.getFromRowFunction())));
-    Structs.addString(
-        base,
-        SCHEMA,
-        StringUtils.byteArrayToJsonString(
-            SchemaTranslation.schemaToProto(target.getSchema(), 
true).toByteArray()));
+
+    try {
+      Structs.addString(
+          base,
+          SCHEMA,
+          
JsonFormat.printer().print(SchemaTranslation.schemaToProto(target.getSchema(), 
true)));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }

Review comment:
       yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to