stevenzwu commented on code in PR #7269:
URL: https://github.com/apache/iceberg/pull/7269#discussion_r1157713790


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -97,8 +97,15 @@ public void open() throws Exception {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void handleOperatorEvent(OperatorEvent evt) {
-    // TODO: receive event with aggregated statistics from coordinator and 
update globalStatistics
+    if (evt instanceof DataStatisticsEvent) {
+      globalStatistics = ((DataStatisticsEvent<K>) evt).dataStatistics();
+      output.collect(
+          new 
StreamRecord<>(DataStatisticsOrRecord.fromDataStatistics(globalStatistics)));
+    } else {
+      throw new IllegalStateException("Received unexpected operator event " + 
evt);

Review Comment:
   nit: iceberg typically uses `Preconditions` for this type of check. 
   
   also we probably don't want to include the event string in the error msg as 
it can include the big statistics map. just the class name should be sufficient.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -126,8 +133,9 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
       globalStatisticsState.add(globalStatistics);

Review Comment:
   Just realized one thing that I missed from last PR. It can be addressed with 
a separate PR. We don't want to use Kryo Java serialization for the 
`DataStatistics`. We need a stable parser (E.g. `SimpleVersionedSerializer`). 
You can find some example from `IcebergEnumeratorStateSerializer`.
   
   You can find some more context from 
https://github.com/apache/iceberg/issues/1698.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to