Abacn commented on code in PR #32389:
URL: https://github.com/apache/beam/pull/32389#discussion_r1752272325


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java:
##########
@@ -425,7 +538,7 @@ static Row decodeDelegate(
         // in which case we drop the extra fields.
         if (encodingPos < coders.length) {
           int rowIndex = encodingPosToIndex[encodingPos];
-          if (nullFields.get(rowIndex)) {
+          if (nullFields.get(encodingPos)) {

Review Comment:
   was this a bug? rowIndex and encodingPos looks different



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java:
##########
@@ -109,30 +113,121 @@ public abstract class RowCoderGenerator {
   private static final String CODERS_FIELD_NAME = "FIELD_CODERS";
   private static final String POSITIONS_FIELD_NAME = 
"FIELD_ENCODING_POSITIONS";
 
-  // Cache for Coder class that are already generated.
-  private static final Map<UUID, Coder<Row>> GENERATED_CODERS = 
Maps.newConcurrentMap();
-  private static final Map<UUID, Map<String, Integer>> 
ENCODING_POSITION_OVERRIDES =
+  static class WithStackTrace<T> {
+    private final T value;
+    private final String stackTrace;
+
+    public WithStackTrace(T value, String stackTrace) {
+      this.value = value;
+      this.stackTrace = stackTrace;
+    }
+
+    public T getValue() {
+      return value;
+    }
+
+    public String getStackTrace() {
+      return stackTrace;
+    }
+  }
+
+  // Cache for Coder class that are already generated. Coders are added with 
setOverridesLock held.
+  private static final Map<UUID, WithStackTrace<Coder<Row>>> GENERATED_CODERS =
       Maps.newConcurrentMap();
 
+  @GuardedBy("setOverridesLock")
+  private static final Map<UUID, WithStackTrace<Map<String, Integer>>> 
ENCODING_POSITION_OVERRIDES =
+      Maps.newHashMap();
+
+  private static final Object setOverridesLock = new Object();
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowCoderGenerator.class);
+
+  private static String getStackTrace() {
+    StringBuilder builder = new StringBuilder();
+    for (StackTraceElement e : new Throwable().getStackTrace()) {
+      builder.append("  at ").append(e).append("\n");
+    }
+    return builder.toString();
+  }
+
   public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> 
encodingPositions) {
-    ENCODING_POSITION_OVERRIDES.put(uuid, encodingPositions);
+    final String stackTrace = getStackTrace();
+    synchronized (setOverridesLock) {
+      @Nullable
+      WithStackTrace<Map<String, Integer>> previousEncodingPositions =
+          ENCODING_POSITION_OVERRIDES.put(
+              uuid, new WithStackTrace<>(encodingPositions, stackTrace));
+      @Nullable WithStackTrace<Coder<Row>> existingCoder = 
GENERATED_CODERS.get(uuid);
+      if (previousEncodingPositions == null) {
+        if (existingCoder != null) {
+          LOG.error(
+              "Received encoding positions for uuid {} too late after creating 
RowCoder. Created: {}\n Override: {}",
+              uuid,
+              existingCoder.getStackTrace(),
+              stackTrace);
+        } else {
+          LOG.info("Received encoding positions {} for uuid {}.", 
encodingPositions, uuid);
+        }
+      } else if 
(!previousEncodingPositions.getValue().equals(encodingPositions)) {
+        if (existingCoder == null) {
+          LOG.error(
+              "Received differing encoding positions for uuid {} before coder 
creation. Was {} at {}\n Now {} at {}",
+              uuid,
+              previousEncodingPositions.getValue(),
+              encodingPositions,
+              previousEncodingPositions.getStackTrace(),
+              stackTrace);
+        } else {
+          LOG.error(
+              "Received differing encoding positions for uuid {} after coder 
creation at {}\n. "
+                  + "Was {} at {}\n Now {} at {}\n",
+              uuid,
+              existingCoder.getStackTrace(),
+              previousEncodingPositions.getValue(),
+              encodingPositions,
+              previousEncodingPositions.getStackTrace(),
+              stackTrace);
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static void clearRowCoderCache() {
+    synchronized (setOverridesLock) {

Review Comment:
   GENERATED_CODERS is already a synchronized map. Usually does not need to be 
wrapped with synchronized block. Here I see "setOverridesLock" is used in other 
places, probably this is the reason. If this is the case, consider adding a 
comment to note this?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java:
##########
@@ -109,30 +113,121 @@ public abstract class RowCoderGenerator {
   private static final String CODERS_FIELD_NAME = "FIELD_CODERS";
   private static final String POSITIONS_FIELD_NAME = 
"FIELD_ENCODING_POSITIONS";
 
-  // Cache for Coder class that are already generated.
-  private static final Map<UUID, Coder<Row>> GENERATED_CODERS = 
Maps.newConcurrentMap();
-  private static final Map<UUID, Map<String, Integer>> 
ENCODING_POSITION_OVERRIDES =
+  static class WithStackTrace<T> {
+    private final T value;
+    private final String stackTrace;
+
+    public WithStackTrace(T value, String stackTrace) {
+      this.value = value;
+      this.stackTrace = stackTrace;
+    }
+
+    public T getValue() {
+      return value;
+    }
+
+    public String getStackTrace() {
+      return stackTrace;
+    }
+  }
+
+  // Cache for Coder class that are already generated. Coders are added with 
setOverridesLock held.
+  private static final Map<UUID, WithStackTrace<Coder<Row>>> GENERATED_CODERS =
       Maps.newConcurrentMap();
 
+  @GuardedBy("setOverridesLock")
+  private static final Map<UUID, WithStackTrace<Map<String, Integer>>> 
ENCODING_POSITION_OVERRIDES =
+      Maps.newHashMap();
+
+  private static final Object setOverridesLock = new Object();
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowCoderGenerator.class);
+
+  private static String getStackTrace() {
+    StringBuilder builder = new StringBuilder();
+    for (StackTraceElement e : new Throwable().getStackTrace()) {
+      builder.append("  at ").append(e).append("\n");
+    }
+    return builder.toString();
+  }
+
   public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> 
encodingPositions) {
-    ENCODING_POSITION_OVERRIDES.put(uuid, encodingPositions);
+    final String stackTrace = getStackTrace();
+    synchronized (setOverridesLock) {
+      @Nullable
+      WithStackTrace<Map<String, Integer>> previousEncodingPositions =
+          ENCODING_POSITION_OVERRIDES.put(
+              uuid, new WithStackTrace<>(encodingPositions, stackTrace));
+      @Nullable WithStackTrace<Coder<Row>> existingCoder = 
GENERATED_CODERS.get(uuid);
+      if (previousEncodingPositions == null) {
+        if (existingCoder != null) {
+          LOG.error(
+              "Received encoding positions for uuid {} too late after creating 
RowCoder. Created: {}\n Override: {}",

Review Comment:
   I also found error/warning log to print stacktrace useful in #31924. However 
this sometimes made the logging extremely long and get truncated in Dataflow 
logging. For example, here it prints two stacktrace, it could truncate before 
the second stacktrace gets to the point of interest.
   
   A helper function 
`sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.arrayToNewlines`
 is used truncate the stacktrace to some finite line, if find useful here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to