stevenzwu commented on code in PR #7269:
URL: https://github.com/apache/iceberg/pull/7269#discussion_r1157713657


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -97,8 +97,15 @@ public void open() throws Exception {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void handleOperatorEvent(OperatorEvent evt) {
-    // TODO: receive event with aggregated statistics from coordinator and 
update globalStatistics
+    if (evt instanceof DataStatisticsEvent) {
+      globalStatistics = ((DataStatisticsEvent<K>) evt).dataStatistics();
+      output.collect(
+          new 
StreamRecord<>(DataStatisticsOrRecord.fromDataStatistics(globalStatistics)));
+    } else {
+      throw new IllegalStateException("Received unexpected operator event " + 
evt);

Review Comment:
   nit: iceberg typically uses `Preconditions` for this type of check. 
   
   also we probably don't want to include the event string in the error msg as 
it can include the big statistics map. just the class name should be sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to