guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r737186361
##########
File path:
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -224,4 +261,28 @@ public boolean isTerminated() {
return totalRecord == 0 || (hasCriteriaStream &&
totalCriteriaRecord == 0);
}
}
+
+ private static class CheckpointStatus {
+
+ private final long totalHeadParallelism;
+
+ private final List<CompletableFuture<byte[]>> stateFutures = new
ArrayList<>();
+
+ private int notifiedCoordinatorParallelism;
Review comment:
int -> long?
##########
File path:
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
this.executor = Objects.requireNonNull(executor);
this.statusByEpoch = new HashMap<>();
- this.alignedConsumers = new HashMap<>();
+ this.listeners = new HashMap<>();
+ this.checkpointStatuses = new HashMap<>();
}
public void registerAlignedConsumer(
- OperatorID operatorID, Consumer<GloballyAlignedEvent>
alignedConsumer) {
+ OperatorID operatorID, SharedProgressAlignerListener
alignedConsumer) {
Review comment:
registerAlignedConsumer or registerAlignedListener?
##########
File path:
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
##########
@@ -45,14 +49,15 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OutputTag;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Executor;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* The head operators unions the initialized variable stream and the feedback
stream, and
Review comment:
operators -> operator?
##########
File path:
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -31,20 +32,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkState;
/**
* The progress aligner shared between multiple {@link
HeadOperatorCoordinator}. It maintains the
* information for each round, once one round is aligned, it would notify all
the register
- * consumers.
+ * listenerss.
Review comment:
listeners?
##########
File path:
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
this.executor = Objects.requireNonNull(executor);
this.statusByEpoch = new HashMap<>();
- this.alignedConsumers = new HashMap<>();
+ this.listeners = new HashMap<>();
+ this.checkpointStatuses = new HashMap<>();
}
public void registerAlignedConsumer(
- OperatorID operatorID, Consumer<GloballyAlignedEvent>
alignedConsumer) {
+ OperatorID operatorID, SharedProgressAlignerListener
alignedConsumer) {
runInEventLoop(
- () -> this.alignedConsumers.put(operatorID, alignedConsumer),
- "Register consumer %s",
+ () -> this.listeners.put(operatorID, alignedConsumer),
+ "Register listeners %s",
operatorID.toHexString());
}
public void unregisterConsumer(OperatorID operatorID) {
synchronized (this) {
Review comment:
Why does `unregisterConsumer` need `synchronized` but
`registerAlignedConsumer` does not need?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]