stevenzwu commented on code in PR #7360:
URL: https://github.com/apache/iceberg/pull/7360#discussion_r1260517779


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+

Review Comment:
   nit: remove empty line



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");

Review Comment:
   nit: `Failed`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");

Review Comment:
   nit: `Closed data statistics coordinator: {}`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);

Review Comment:
   this is redundant to line 104 above. actually the catch block will log an 
error msg in addition to this successful close message. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {

Review Comment:
   is simple `shutdown` good enough? do we need this try-batch block?
   
   I searched existing Iceberg code. they all seems to just do simple shutdown. 
if we decide to remove the try block. other comments won't be applicable anymore



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(
+        subtask, event)) {
+      GlobalStatisticsAggregator<D, S> lastCompletedAggregator =
+          globalStatisticsAggregatorTracker.lastCompletedAggregator();
+      sendDataStatisticsToSubtasks(
+          lastCompletedAggregator.checkpointId(), 
lastCompletedAggregator.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format("Failed to send global data statistics for checkpoint 
%d", checkpointId));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format(
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(), subtask, attemptNumber));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              
globalStatisticsAggregatorTracker.serializeLastCompletedAggregator());
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    if (started) {
+      throw new IllegalStateException(
+          "The coordinator can only be reset if it was not yet started");
+    }
+
+    if (checkpointData == null) {
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}.", 
operatorName, checkpointId);
+
+    
globalStatisticsAggregatorTracker.deserializeLastCompletedAggregator(checkpointData);
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Resetting subtask {} to checkpoint {} for data statistics {}.",
+              subtask,
+              checkpointId,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.reset(subtask);
+        },
+        String.format("handling subtask %d recovery to checkpoint %d", 
subtask, checkpointId));
+  }
+
+  @Override
+  public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable 
Throwable reason) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Unregistering gateway after failure for subtask {} (#{}) of 
data statistic {}.",
+              subtask,
+              attemptNumber,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
+        },
+        String.format("handling subtask %d (#%d) failure", subtask, 
attemptNumber));
+  }
+
+  @Override
+  public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+    Preconditions.checkArgument(subtask == gateway.getSubtask());
+    Preconditions.checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+    runInCoordinatorThread(
+        () -> {
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.registerSubtaskGateway(gateway);
+        },
+        String.format(
+            "making event gateway to subtask %d (#%d) available", subtask, 
attemptNumber));
+  }
+
+  @VisibleForTesting
+  GlobalStatisticsAggregatorTracker<D, S> globalStatisticsAggregatorTracker() {
+    return globalStatisticsAggregatorTracker;
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      gateways = new Map[parallelism];
+
+      for (int i = 0; i < parallelism; ++i) {
+        gateways[i] = Maps.newHashMap();
+      }
+    }
+
+    private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway 
gateway) {
+      int subtaskIndex = gateway.getSubtask();
+      int attemptNumber = gateway.getExecution().getAttemptNumber();
+      Preconditions.checkState(
+          !gateways[subtaskIndex].containsKey(attemptNumber),
+          "Already have a subtask gateway for %d (#%d).",
+          subtaskIndex,
+          attemptNumber);
+      LOG.debug("Register gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);

Review Comment:
   it is probably helpful to include operator name in the log message



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);

Review Comment:
   nit: same on not using class name



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GlobalStatisticsAggregator is used by {@link DataStatisticsCoordinator} to 
collect {@link
+ * DataStatistics} from {@link DataStatisticsOperator} subtasks for specific 
checkpoint. It stores
+ * the merged {@link DataStatistics} result and uses set to keep a record of 
all reported subtasks.
+ */
+@Internal
+class GlobalStatisticsAggregatorTracker<D extends DataStatistics<D, S>, S> 
implements Serializable {

Review Comment:
   is `GlobalStatisticsTracker` good?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);

Review Comment:
   nit: maybe `... coordinator: {}`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");

Review Comment:
   nit: `Failed`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorProvider.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.ThreadFactory;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * DataStatisticsCoordinatorProvider provides the method to create new {@link
+ * DataStatisticsCoordinator} and defines {@link 
CoordinatorExecutorThreadFactory} to create new
+ * thread for {@link DataStatisticsCoordinator} to execute task
+ */
+public class DataStatisticsCoordinatorProvider<D extends DataStatistics<D, S>, 
S>
+    extends RecreateOnResetOperatorCoordinator.Provider {
+
+  private final String operatorName;
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  public DataStatisticsCoordinatorProvider(
+      String operatorName,
+      OperatorID operatorID,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    super(operatorID);
+    this.operatorName = operatorName;
+    this.statisticsSerializer = statisticsSerializer;
+  }
+
+  @Override
+  public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) {
+    return new DataStatisticsCoordinator<>(operatorName, context, 
statisticsSerializer);
+  }
+
+  static class CoordinatorExecutorThreadFactory

Review Comment:
   does it make sense to move this inside `DataStatisticsCoordinator` since it 
is only used there?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;

Review Comment:
   this class member doesn't seem to be used



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(
+        subtask, event)) {
+      GlobalStatisticsAggregator<D, S> lastCompletedAggregator =
+          globalStatisticsAggregatorTracker.lastCompletedAggregator();
+      sendDataStatisticsToSubtasks(
+          lastCompletedAggregator.checkpointId(), 
lastCompletedAggregator.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format("Failed to send global data statistics for checkpoint 
%d", checkpointId));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format(
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(), subtask, attemptNumber));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              
globalStatisticsAggregatorTracker.serializeLastCompletedAggregator());
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    if (started) {
+      throw new IllegalStateException(
+          "The coordinator can only be reset if it was not yet started");
+    }
+
+    if (checkpointData == null) {
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}.", 
operatorName, checkpointId);
+
+    
globalStatisticsAggregatorTracker.deserializeLastCompletedAggregator(checkpointData);
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Resetting subtask {} to checkpoint {} for data statistics {}.",
+              subtask,
+              checkpointId,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.reset(subtask);
+        },
+        String.format("handling subtask %d recovery to checkpoint %d", 
subtask, checkpointId));
+  }
+
+  @Override
+  public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable 
Throwable reason) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Unregistering gateway after failure for subtask {} (#{}) of 
data statistic {}.",
+              subtask,
+              attemptNumber,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
+        },
+        String.format("handling subtask %d (#%d) failure", subtask, 
attemptNumber));
+  }
+
+  @Override
+  public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+    Preconditions.checkArgument(subtask == gateway.getSubtask());
+    Preconditions.checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+    runInCoordinatorThread(
+        () -> {
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.registerSubtaskGateway(gateway);
+        },
+        String.format(
+            "making event gateway to subtask %d (#%d) available", subtask, 
attemptNumber));
+  }
+
+  @VisibleForTesting
+  GlobalStatisticsAggregatorTracker<D, S> globalStatisticsAggregatorTracker() {
+    return globalStatisticsAggregatorTracker;
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      gateways = new Map[parallelism];
+
+      for (int i = 0; i < parallelism; ++i) {
+        gateways[i] = Maps.newHashMap();
+      }
+    }
+
+    private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway 
gateway) {
+      int subtaskIndex = gateway.getSubtask();
+      int attemptNumber = gateway.getExecution().getAttemptNumber();
+      Preconditions.checkState(
+          !gateways[subtaskIndex].containsKey(attemptNumber),
+          "Already have a subtask gateway for %d (#%d).",
+          subtaskIndex,
+          attemptNumber);
+      LOG.debug("Register gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);
+      gateways[subtaskIndex].put(attemptNumber, gateway);
+    }
+
+    private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber) 
{
+      LOG.debug("Unregister gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);
+      gateways[subtaskIndex].remove(attemptNumber);
+    }
+
+    private OperatorCoordinator.SubtaskGateway getOnlyGatewayAndCheckReady(int 
subtaskIndex) {

Review Comment:
   nit: method name could be just `getGateway`. ready check could be internal 
details. I know it is the way how Flink name this method. we don't have to use 
the same method name



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(

Review Comment:
   this method indicates we are doing two things in one method. should it be 
split into two separate methods `receiveDataStatisticsEvent` and 
`isCurrentTrackingComplete`.
   
   logic may look like this
   ```
   tracker. receiveDataStatisticsEvent()
   if (track. isCurrentCycleCompleted()) {
      GlobalStatisticsAggregator<D, S> lastCompletedAggregator = 
tracker.completeLastTrackingCycle()
   }
   ```



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsEvent.java:
##########
@@ -30,28 +34,47 @@
 class DataStatisticsEvent<D extends DataStatistics<D, S>, S> implements 
OperatorEvent {
 
   private static final long serialVersionUID = 1L;
-
   private final long checkpointId;
-  private final DataStatistics<D, S> dataStatistics;
+  private final byte[] dataStatisticsBytes;
 
-  DataStatisticsEvent(long checkpointId, DataStatistics<D, S> dataStatistics) {
+  DataStatisticsEvent(
+      long checkpointId,
+      DataStatistics<D, S> dataStatistics,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
     this.checkpointId = checkpointId;
-    this.dataStatistics = dataStatistics;
+    DataOutputSerializer out = new DataOutputSerializer(64);
+    try {
+      statisticsSerializer.serialize(dataStatistics, out);
+      this.dataStatisticsBytes = out.getCopyOfBuffer();
+    } catch (IOException e) {
+      throw new IllegalStateException("Fail to serialize data statistics", e);
+    }
   }
 
   long checkpointId() {
     return checkpointId;
   }
 
-  DataStatistics<D, S> dataStatistics() {
+  @SuppressWarnings("unchecked")
+  D dataStatistics(TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {

Review Comment:
   this class has a little weird structure: (1) pass a serializer to the 
constructor to serialize the object to `byte[]` (2) on read path, a serializer 
is passed in to deserialize.
   
   Alternatively, we can keep this class as simple mojo class (with `byte[]`). 
keep the serialization and deserialization outside this class. maybe to a util 
class?



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(
+        subtask, event)) {
+      GlobalStatisticsAggregator<D, S> lastCompletedAggregator =
+          globalStatisticsAggregatorTracker.lastCompletedAggregator();
+      sendDataStatisticsToSubtasks(
+          lastCompletedAggregator.checkpointId(), 
lastCompletedAggregator.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format("Failed to send global data statistics for checkpoint 
%d", checkpointId));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format(
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(), subtask, attemptNumber));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              
globalStatisticsAggregatorTracker.serializeLastCompletedAggregator());
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    if (started) {

Review Comment:
   nit: `Preconditions.checkState`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);

Review Comment:
   nit: Iceberg error msg doesn't use class name, as they are not user 
friendly. just `data statistics coordinator` as used in other places.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(
+        subtask, event)) {
+      GlobalStatisticsAggregator<D, S> lastCompletedAggregator =
+          globalStatisticsAggregatorTracker.lastCompletedAggregator();
+      sendDataStatisticsToSubtasks(
+          lastCompletedAggregator.checkpointId(), 
lastCompletedAggregator.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format("Failed to send global data statistics for checkpoint 
%d", checkpointId));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format(
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(), subtask, attemptNumber));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              
globalStatisticsAggregatorTracker.serializeLastCompletedAggregator());
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    if (started) {
+      throw new IllegalStateException(
+          "The coordinator can only be reset if it was not yet started");
+    }
+
+    if (checkpointData == null) {
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}.", 
operatorName, checkpointId);
+
+    
globalStatisticsAggregatorTracker.deserializeLastCompletedAggregator(checkpointData);
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Resetting subtask {} to checkpoint {} for data statistics {}.",
+              subtask,
+              checkpointId,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.reset(subtask);
+        },
+        String.format("handling subtask %d recovery to checkpoint %d", 
subtask, checkpointId));
+  }
+
+  @Override
+  public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable 
Throwable reason) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Unregistering gateway after failure for subtask {} (#{}) of 
data statistic {}.",
+              subtask,
+              attemptNumber,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
+        },
+        String.format("handling subtask %d (#%d) failure", subtask, 
attemptNumber));
+  }
+
+  @Override
+  public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+    Preconditions.checkArgument(subtask == gateway.getSubtask());
+    Preconditions.checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+    runInCoordinatorThread(
+        () -> {
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.registerSubtaskGateway(gateway);
+        },
+        String.format(
+            "making event gateway to subtask %d (#%d) available", subtask, 
attemptNumber));
+  }
+
+  @VisibleForTesting
+  GlobalStatisticsAggregatorTracker<D, S> globalStatisticsAggregatorTracker() {
+    return globalStatisticsAggregatorTracker;
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      gateways = new Map[parallelism];
+
+      for (int i = 0; i < parallelism; ++i) {
+        gateways[i] = Maps.newHashMap();
+      }
+    }
+
+    private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway 
gateway) {
+      int subtaskIndex = gateway.getSubtask();
+      int attemptNumber = gateway.getExecution().getAttemptNumber();
+      Preconditions.checkState(
+          !gateways[subtaskIndex].containsKey(attemptNumber),
+          "Already have a subtask gateway for %d (#%d).",
+          subtaskIndex,
+          attemptNumber);
+      LOG.debug("Register gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);
+      gateways[subtaskIndex].put(attemptNumber, gateway);
+    }
+
+    private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber) 
{
+      LOG.debug("Unregister gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);
+      gateways[subtaskIndex].remove(attemptNumber);
+    }
+
+    private OperatorCoordinator.SubtaskGateway getOnlyGatewayAndCheckReady(int 
subtaskIndex) {
+      Preconditions.checkState(
+          gateways[subtaskIndex].size() > 0,
+          "Subtask %d is not ready yet to receive events.",

Review Comment:
   include operator name in the error msg



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -129,9 +136,14 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
       globalStatisticsState.add(globalStatistics);
     }
 
-    // For now, we make it simple to send globalStatisticsState at checkpoint
+    // For now, we make it simple to send localStatistics at checkpoint
     operatorEventGateway.sendEventToCoordinator(
-        new DataStatisticsEvent<>(checkpointId, localStatistics));
+        new DataStatisticsEvent(checkpointId, localStatistics, 
statisticsSerializer));
+    LOG.debug(
+        "Send local statistics {} from subtask {} at checkpoint {} to 
coordinator",

Review Comment:
   nit: include operator name



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -121,6 +124,10 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
         checkpointId,
         subTaskId);
 
+    // Send global statistics to partitioners at checkpoint to update data 
distribution at the same

Review Comment:
   nit: this comment seems unnecessary. for readers, it seems to be applied to 
the single line below. but it actually applies to multiple lines below.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GlobalStatisticsAggregator is used by {@link DataStatisticsCoordinator} to 
collect {@link
+ * DataStatistics} from {@link DataStatisticsOperator} subtasks for specific 
checkpoint. It stores
+ * the merged {@link DataStatistics} result and uses set to keep a record of 
all reported subtasks.
+ */
+@Internal
+class GlobalStatisticsAggregatorTracker<D extends DataStatistics<D, S>, S> 
implements Serializable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GlobalStatisticsAggregatorTracker.class);
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
90;
+  private final transient TypeSerializer<DataStatistics<D, S>> 
statisticsSerializer;
+  private final transient int parallelism;
+  private transient volatile GlobalStatisticsAggregator<D, S> 
inProgressAggregator;
+  private volatile GlobalStatisticsAggregator<D, S> lastCompletedAggregator;
+
+  GlobalStatisticsAggregatorTracker(
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer, int 
parallelism) {
+    this.statisticsSerializer = statisticsSerializer;
+    this.parallelism = parallelism;
+  }
+
+  boolean receiveDataStatisticEventAndCheckCompletion(

Review Comment:
   I am wondering if this method signature is more clear? I found the class 
name like `AggregatorTracker` a little weird. With this signature, we can 
probably just call this class `Aggregator`. Then it only needs to maintain the 
in-progress aggregation. We can remove the other class (currently named as 
`Aggregator).
   
   ```
   /**
    * Return globally aggregated statistics if received local statistics from 
all subtasks for the current checkpoint id or
    * if the in-progress aggregation is for a previous checkpoint and there are 
enough reported subtasks (90%).
    * Return null otherwise (as no completed aggregation).
    */
   DataStatistics<D, S> merge(int subtask, DataStatisticsEvent<D, S> event);
   ```
   
   Is this pseudo code logic a little clearer?
   ```
   if (inprogressCheckpointId != null && inprogressCheckpointId > 
eventCheckpointId) {
     // ignore the event from older checkpoint
     return null;
   }
   
   DataStatistics completedStatistics = null;
   
   if (inprogressCheckpointId != null && inprogressCheckpointId < 
eventCheckpointId) {
     if (over 90%) {
       completedStatistics = inprogressStatistics;
     } 
     inprogressCheckpointId = null;
     inprogressStatistics = null;
   }
   
   if (inprogressStatistics == null) {
     inprogressCheckpointId = eventCheckpointId;
     inprogressStatistics = new DataStatistics();
   }
   
   inprogressStatistics.merge(localStatistics);
   if (all subtasks reported) {
     completedStatistics =  inprogressStatistics;
     inprogressCheckpointId = null;
     inprogressStatistics = null; 
   }
   
   return completedStatistics;
   ```



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GlobalStatisticsAggregator is used by {@link DataStatisticsCoordinator} to 
collect {@link
+ * DataStatistics} from {@link DataStatisticsOperator} subtasks for specific 
checkpoint. It stores
+ * the merged {@link DataStatistics} result and uses set to keep a record of 
all reported subtasks.
+ */
+@Internal
+class GlobalStatisticsAggregatorTracker<D extends DataStatistics<D, S>, S> 
implements Serializable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GlobalStatisticsAggregatorTracker.class);
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
90;
+  private final transient TypeSerializer<DataStatistics<D, S>> 
statisticsSerializer;
+  private final transient int parallelism;
+  private transient volatile GlobalStatisticsAggregator<D, S> 
inProgressAggregator;
+  private volatile GlobalStatisticsAggregator<D, S> lastCompletedAggregator;
+
+  GlobalStatisticsAggregatorTracker(
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer, int 
parallelism) {
+    this.statisticsSerializer = statisticsSerializer;
+    this.parallelism = parallelism;
+  }
+
+  boolean receiveDataStatisticEventAndCheckCompletion(
+      int subtask, DataStatisticsEvent<D, S> event) {
+    long checkpointId = event.checkpointId();
+
+    if (lastCompletedAggregator != null && 
lastCompletedAggregator.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore 
the event from subtask {} for checkpoint {}",
+          lastCompletedAggregator.checkpointId(),
+          subtask,
+          checkpointId);
+      return false;
+    }
+
+    if (inProgressAggregator == null) {
+      inProgressAggregator = new GlobalStatisticsAggregator<>(checkpointId, 
statisticsSerializer);
+    }
+
+    boolean completed = false;
+    if (inProgressAggregator.checkpointId() < checkpointId) {
+      if ((double) inProgressAggregator.aggregatedSubtasksCount() / 
parallelism * 100
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        lastCompletedAggregator = inProgressAggregator;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. "
+                + "It's more than the expected percentage {}. Update last 
completed aggregator to be "
+                + " {}.",
+            inProgressAggregator.aggregatedSubtasksCount(),
+            parallelism,
+            inProgressAggregator.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            lastCompletedAggregator);
+        inProgressAggregator = new GlobalStatisticsAggregator<>(checkpointId, 
statisticsSerializer);
+        completed = true;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. "
+                + "It's less than the expected percentage {}. Dropping the 
incomplete aggregate "
+                + "data statistics and starting collecting data statistics 
from new checkpoint {}",
+            inProgressAggregator.aggregatedSubtasksCount(),
+            parallelism,
+            inProgressAggregator.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            checkpointId);
+        inProgressAggregator = new GlobalStatisticsAggregator<>(checkpointId, 
statisticsSerializer);
+      }
+    } else if (inProgressAggregator.checkpointId() > checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from 
older checkpoint {}. Ignore it.",
+          inProgressAggregator.checkpointId(),
+          checkpointId);
+      return false;
+    }
+
+    inProgressAggregator.mergeDataStatistic(
+        subtask, event.checkpointId(), 
event.dataStatistics(statisticsSerializer));
+
+    if (inProgressAggregator.aggregatedSubtasksCount() == parallelism) {
+      lastCompletedAggregator = inProgressAggregator;
+      LOG.info(
+          "Received data statistics from all {} operators for checkpoint {}. 
Update last completed aggregator to be {}.",
+          parallelism,
+          inProgressAggregator.checkpointId(),
+          lastCompletedAggregator.dataStatistics());
+      inProgressAggregator = null;
+      completed = true;
+    }
+    return completed;
+  }
+
+  @VisibleForTesting
+  GlobalStatisticsAggregator<D, S> inProgressAggregator() {
+    return inProgressAggregator;
+  }
+
+  GlobalStatisticsAggregator<D, S> lastCompletedAggregator() {
+    return lastCompletedAggregator;
+  }
+
+  byte[] serializeLastCompletedAggregator() throws IOException {

Review Comment:
   serializer can't be moved to a separate util class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to