tweise commented on code in PR #269:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/269#discussion_r901111930


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java:
##########
@@ -122,4 +144,45 @@ public <T extends CustomResource<?, STATUS>> void 
removeCachedStatus(T resource)
     protected static Tuple2<String, String> getKey(HasMetadata resource) {
         return Tuple2.of(resource.getMetadata().getNamespace(), 
resource.getMetadata().getName());
     }
+
+    public static <S extends CommonStatus<?>> StatusHelper<S> create(
+            KubernetesClient kubernetesClient, 
Collection<FlinkResourceListener> listeners) {
+        BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
+                (resource, previousStatus) -> {
+                    var ctx =
+                            new FlinkResourceListener.StatusUpdateContext() {
+                                @Override
+                                public S getPreviousStatus() {
+                                    return previousStatus;
+                                }
+
+                                @Override
+                                public AbstractFlinkResource<?, S> 
getFlinkResource() {
+                                    return resource;
+                                }
+
+                                @Override
+                                public KubernetesClient getKubernetesClient() {
+                                    return kubernetesClient;
+                                }
+                            };
+
+                    listeners.forEach(
+                            listener -> {
+                                try {
+                                    if (resource instanceof FlinkDeployment) {
+                                        listener.onDeploymentStatusUpdate(ctx);
+                                    } else {
+                                        listener.onSessionJobStatusUpdate(ctx);
+                                    }
+                                } catch (Exception e) {
+                                    LOG.error(

Review Comment:
   Shouldn't this propagate the exception since it is an error that needs to 
fixed vs. swallowed? Otherwise WARN would be more appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to